kapsikkum-unmanic – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env python3 |
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | unmanic.installation_link.py |
||
6 | |||
7 | Written by: Josh.5 <jsunnex@gmail.com> |
||
8 | Date: 28 Oct 2021, (7:24 PM) |
||
9 | |||
10 | Copyright: |
||
11 | Copyright (C) Josh Sunnex - All Rights Reserved |
||
12 | |||
13 | Permission is hereby granted, free of charge, to any person obtaining a copy |
||
14 | of this software and associated documentation files (the "Software"), to deal |
||
15 | in the Software without restriction, including without limitation the rights |
||
16 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||
17 | copies of the Software, and to permit persons to whom the Software is |
||
18 | furnished to do so, subject to the following conditions: |
||
19 | |||
20 | The above copyright notice and this permission notice shall be included in all |
||
21 | copies or substantial portions of the Software. |
||
22 | |||
23 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
||
24 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
||
25 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
||
26 | IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||
27 | DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||
28 | OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||
29 | OR OTHER DEALINGS IN THE SOFTWARE. |
||
30 | |||
31 | """ |
||
32 | import json |
||
33 | import os.path |
||
34 | import queue |
||
35 | import threading |
||
36 | import time |
||
37 | |||
38 | import requests |
||
39 | from requests.auth import HTTPBasicAuth |
||
40 | from requests_toolbelt import MultipartEncoder |
||
41 | |||
42 | from unmanic import config |
||
43 | from unmanic.libs import common, session, task, unlogger |
||
44 | from unmanic.libs.library import Library |
||
45 | from unmanic.libs.session import Session |
||
46 | from unmanic.libs.singleton import SingletonType |
||
47 | |||
48 | |||
49 | class RequestHandler: |
||
50 | |||
51 | def __init__(self, *args, **kwargs): |
||
52 | self.auth = kwargs.get('auth', '') |
||
53 | # Set username (could be passed in as None) |
||
54 | self.username = '' |
||
55 | if kwargs.get('username'): |
||
56 | self.username = kwargs.get('username') |
||
57 | # Set password (could be passed in as None) |
||
58 | self.password = '' |
||
59 | if kwargs.get('password'): |
||
60 | self.password = kwargs.get('password') |
||
61 | |||
62 | def __get_request_auth(self): |
||
63 | request_auth = None |
||
64 | if self.auth and self.auth.lower() == 'basic': |
||
65 | request_auth = HTTPBasicAuth(self.username, self.password) |
||
66 | return request_auth |
||
67 | |||
68 | def get(self, url, **kwargs): |
||
69 | return requests.get(url, auth=self.__get_request_auth(), **kwargs) |
||
70 | |||
71 | def post(self, url, **kwargs): |
||
72 | return requests.post(url, auth=self.__get_request_auth(), **kwargs) |
||
73 | |||
74 | def delete(self, url, **kwargs): |
||
75 | return requests.delete(url, auth=self.__get_request_auth(), **kwargs) |
||
76 | |||
77 | |||
78 | class Links(object, metaclass=SingletonType): |
||
79 | _network_transfer_lock = {} |
||
80 | |||
81 | def __init__(self, *args, **kwargs): |
||
82 | self.settings = config.Config() |
||
83 | self.session = session.Session() |
||
84 | unmanic_logging = unlogger.UnmanicLogger.__call__() |
||
85 | self.logger = unmanic_logging.get_logger(__class__.__name__) |
||
86 | |||
87 | def _log(self, message, message2='', level="info"): |
||
88 | message = common.format_message(message, message2) |
||
89 | getattr(self.logger, level)(message) |
||
90 | |||
91 | def __format_address(self, address: str): |
||
92 | # Strip all whitespace |
||
93 | address = address.strip() |
||
94 | # Add http if it does not exist |
||
95 | if not address.lower().startswith('http'): |
||
96 | address = "http://{}".format(address) |
||
97 | # Strip any trailing slashes |
||
98 | address = address.rstrip('/') |
||
99 | return address |
||
100 | |||
101 | def __merge_config_dicts(self, config_dict, compare_dict): |
||
102 | for key in config_dict.keys(): |
||
103 | if config_dict.get(key) != compare_dict.get(key) and compare_dict.get(key) is not None: |
||
104 | # Apply the new value |
||
105 | config_dict[key] = compare_dict.get(key) |
||
106 | # Also flag the dict as updated |
||
107 | config_dict['last_updated'] = time.time() |
||
108 | |||
109 | def __generate_default_config(self, config_dict: dict): |
||
110 | return { |
||
111 | "address": config_dict.get('address', '???'), |
||
112 | "auth": config_dict.get('auth', 'None'), |
||
113 | "username": config_dict.get('username', ''), |
||
114 | "password": config_dict.get('password', ''), |
||
115 | "enable_receiving_tasks": config_dict.get('enable_receiving_tasks', False), |
||
116 | "enable_sending_tasks": config_dict.get('enable_sending_tasks', False), |
||
117 | "enable_task_preloading": config_dict.get('enable_task_preloading', True), |
||
118 | "preloading_count": config_dict.get('preloading_count', 2), |
||
119 | "enable_checksum_validation": config_dict.get('enable_checksum_validation', False), |
||
120 | "enable_config_missing_libraries": config_dict.get('enable_config_missing_libraries', False), |
||
121 | "enable_distributed_worker_count": config_dict.get('enable_distributed_worker_count', False), |
||
122 | "name": config_dict.get('name', '???'), |
||
123 | "version": config_dict.get('version', '???'), |
||
124 | "uuid": config_dict.get('uuid', '???'), |
||
125 | "available": config_dict.get('available', False), |
||
126 | "task_count": config_dict.get('task_count', 0), |
||
127 | "last_updated": config_dict.get('last_updated', time.time()), |
||
128 | } |
||
129 | |||
130 | def acquire_network_transfer_lock(self, url, transfer_limit=1, lock_type='send'): |
||
131 | """ |
||
132 | Limit transfers to each installation to 1 at a time |
||
133 | |||
134 | :param url: |
||
135 | :param transfer_limit: |
||
136 | :param lock_type: |
||
137 | :return: |
||
138 | """ |
||
139 | time_now = time.time() |
||
140 | lock = threading.RLock() |
||
141 | # Limit maximum transfer limit to 5 |
||
142 | if transfer_limit > 5: |
||
143 | transfer_limit = 5 |
||
144 | # Acquire a lock if one is available |
||
145 | with lock: |
||
146 | for tx_lock in range(transfer_limit): |
||
147 | lock_key = "[{}-{}]-{}".format(lock_type, tx_lock, url) |
||
148 | if self._network_transfer_lock.get(lock_key, {}).get('expires', 0) < time_now: |
||
149 | # Create new upload lock that will expire in 1 minute |
||
150 | self._network_transfer_lock[lock_key] = { |
||
151 | 'expires': (time_now + 60), |
||
152 | } |
||
153 | # Return success |
||
154 | return lock_key |
||
155 | # Failed to acquire network transfer lock |
||
156 | return False |
||
157 | |||
158 | def release_network_transfer_lock(self, lock_key): |
||
159 | """ |
||
160 | Expire the transfer lock for the given lock_key |
||
161 | |||
162 | :param lock_key: |
||
163 | :return: |
||
164 | """ |
||
165 | lock = threading.RLock() |
||
166 | with lock: |
||
167 | # Expire the lock for this address |
||
168 | self._network_transfer_lock[lock_key] = {} |
||
169 | return True |
||
170 | |||
171 | def remote_api_get(self, remote_config: dict, endpoint: str, timeout=2): |
||
172 | """ |
||
173 | GET to remote installation API |
||
174 | |||
175 | :param remote_config: |
||
176 | :param endpoint: |
||
177 | :param timeout: |
||
178 | :return: |
||
179 | """ |
||
180 | request_handler = RequestHandler( |
||
181 | auth=remote_config.get('auth'), |
||
182 | username=remote_config.get('username'), |
||
183 | password=remote_config.get('password'), |
||
184 | ) |
||
185 | address = self.__format_address(remote_config.get('address')) |
||
186 | url = "{}{}".format(address, endpoint) |
||
187 | res = request_handler.get(url, timeout=timeout) |
||
188 | if res.status_code == 200: |
||
189 | return res.json() |
||
190 | elif res.status_code in [400, 404, 405, 500]: |
||
191 | json_data = res.json() |
||
192 | self._log("Error while executing GET on remote installation API - {}. Message: '{}'".format( |
||
193 | endpoint, |
||
194 | json_data.get('error')), |
||
195 | message2=json_data.get('traceback', []), level='error') |
||
196 | return {} |
||
197 | |||
198 | def remote_api_post(self, remote_config: dict, endpoint: str, data: dict, timeout=2): |
||
199 | """ |
||
200 | POST to remote installation API |
||
201 | |||
202 | :param remote_config: |
||
203 | :param endpoint: |
||
204 | :param data: |
||
205 | :param timeout: |
||
206 | :return: |
||
207 | """ |
||
208 | request_handler = RequestHandler( |
||
209 | auth=remote_config.get('auth'), |
||
210 | username=remote_config.get('username'), |
||
211 | password=remote_config.get('password'), |
||
212 | ) |
||
213 | address = self.__format_address(remote_config.get('address')) |
||
214 | url = "{}{}".format(address, endpoint) |
||
215 | res = request_handler.post(url, json=data, timeout=timeout) |
||
216 | if res.status_code == 200: |
||
217 | return res.json() |
||
218 | elif res.status_code in [400, 404, 405, 500]: |
||
219 | json_data = res.json() |
||
220 | self._log("Error while executing POST on remote installation API - {}. Message: '{}'".format( |
||
221 | endpoint, |
||
222 | json_data.get('error')), |
||
223 | message2=json_data.get('traceback', []), level='error') |
||
224 | return json_data |
||
225 | return {} |
||
226 | |||
227 | def remote_api_post_file(self, remote_config: dict, endpoint: str, path: str): |
||
228 | """ |
||
229 | Send a file to the remote installation |
||
230 | No timeout is set so the request will continue until closed |
||
231 | |||
232 | :param remote_config: |
||
233 | :param endpoint: |
||
234 | :param path: |
||
235 | :return: |
||
236 | """ |
||
237 | request_handler = RequestHandler( |
||
238 | auth=remote_config.get('auth'), |
||
239 | username=remote_config.get('username'), |
||
240 | password=remote_config.get('password'), |
||
241 | ) |
||
242 | address = self.__format_address(remote_config.get('address')) |
||
243 | url = "{}{}".format(address, endpoint) |
||
244 | # NOTE: If you remove a content type from the upload (text/plain) the file upload fails |
||
245 | # NOTE2: The 'ith open(path, "rb") as f' method reads the file into memory before uploading. |
||
246 | # This is slow and not ideal for devices with small amounts of ram. |
||
247 | # ``` |
||
248 | # with open(path, "rb") as f: |
||
249 | # files = {"fileName": (os.path.basename(path), f, 'text/plain')} |
||
250 | # res = requests.post(url, files=files) |
||
251 | # ``` |
||
252 | m = MultipartEncoder(fields={'fileName': (os.path.basename(path), open(path, 'rb'), 'text/plain')}) |
||
253 | res = request_handler.post(url, data=m, headers={'Content-Type': m.content_type}) |
||
254 | if res.status_code == 200: |
||
255 | return res.json() |
||
256 | elif res.status_code in [400, 404, 405, 500]: |
||
257 | json_data = res.json() |
||
258 | self._log("Error while uploading file to remote installation API - {}. Message: '{}'".format( |
||
259 | endpoint, |
||
260 | json_data.get('error')), |
||
261 | message2=json_data.get('traceback', []), level='error') |
||
262 | return {} |
||
263 | |||
264 | def remote_api_delete(self, remote_config: dict, endpoint: str, data: dict, timeout=2): |
||
265 | """ |
||
266 | DELETE to remote installation API |
||
267 | |||
268 | :param remote_config: |
||
269 | :param endpoint: |
||
270 | :param data: |
||
271 | :param timeout: |
||
272 | :return: |
||
273 | """ |
||
274 | request_handler = RequestHandler( |
||
275 | auth=remote_config.get('auth'), |
||
276 | username=remote_config.get('username'), |
||
277 | password=remote_config.get('password'), |
||
278 | ) |
||
279 | address = self.__format_address(remote_config.get('address')) |
||
280 | url = "{}{}".format(address, endpoint) |
||
281 | res = request_handler.delete(url, json=data, timeout=timeout) |
||
282 | if res.status_code == 200: |
||
283 | return res.json() |
||
284 | elif res.status_code in [400, 404, 405, 500]: |
||
285 | json_data = res.json() |
||
286 | self._log("Error while executing DELETE on remote installation API - {}. Message: '{}'".format( |
||
287 | endpoint, |
||
288 | json_data.get('error')), |
||
289 | message2=json_data.get('traceback', []), level='error') |
||
290 | return {} |
||
291 | |||
292 | def remote_api_get_download(self, remote_config: dict, endpoint: str, path: str): |
||
293 | """ |
||
294 | Download a file from a remote installation |
||
295 | |||
296 | :param remote_config: |
||
297 | :param endpoint: |
||
298 | :param path: |
||
299 | :return: |
||
300 | """ |
||
301 | request_handler = RequestHandler( |
||
302 | auth=remote_config.get('auth'), |
||
303 | username=remote_config.get('username'), |
||
304 | password=remote_config.get('password'), |
||
305 | ) |
||
306 | address = self.__format_address(remote_config.get('address')) |
||
307 | url = "{}{}".format(address, endpoint) |
||
308 | with request_handler.get(url, stream=True) as r: |
||
309 | r.raise_for_status() |
||
310 | with open(path, 'wb') as f: |
||
311 | for chunk in r.iter_content(chunk_size=None): |
||
312 | if chunk: |
||
313 | f.write(chunk) |
||
314 | return True |
||
315 | |||
316 | def validate_remote_installation(self, address: str, **kwargs): |
||
317 | """ |
||
318 | Validate a remote Unmanic installation by requesting |
||
319 | its system info and version |
||
320 | |||
321 | :param address: |
||
322 | :param username: |
||
323 | :param password: |
||
324 | :return: |
||
325 | """ |
||
326 | address = self.__format_address(address) |
||
327 | |||
328 | request_handler = RequestHandler( |
||
329 | auth=kwargs.get('auth'), |
||
330 | username=kwargs.get('username'), |
||
331 | password=kwargs.get('password'), |
||
332 | ) |
||
333 | |||
334 | # Fetch config |
||
335 | url = "{}/unmanic/api/v2/settings/configuration".format(address) |
||
336 | res = request_handler.get(url, timeout=2) |
||
337 | if res.status_code != 200: |
||
338 | if res.status_code in [400, 404, 405, 500]: |
||
339 | json_data = res.json() |
||
340 | self._log("Error while fetching remote installation config. Message: '{}'".format(json_data.get('error')), |
||
341 | message2=json_data.get('traceback', []), level='error') |
||
342 | return {} |
||
343 | system_configuration_data = res.json() |
||
344 | |||
345 | # Fetch settings |
||
346 | url = "{}/unmanic/api/v2/settings/read".format(address) |
||
347 | res = request_handler.get(url, timeout=2) |
||
348 | if res.status_code != 200: |
||
349 | if res.status_code in [400, 404, 405, 500]: |
||
350 | json_data = res.json() |
||
351 | self._log("Error while fetching remote installation settings. Message: '{}'".format(json_data.get('error')), |
||
352 | message2=json_data.get('traceback', []), level='error') |
||
353 | return {} |
||
354 | settings_data = res.json() |
||
355 | |||
356 | # Fetch version |
||
357 | url = "{}/unmanic/api/v2/version/read".format(address) |
||
358 | res = request_handler.get(url, timeout=2) |
||
359 | if res.status_code != 200: |
||
360 | if res.status_code in [400, 404, 405, 500]: |
||
361 | json_data = res.json() |
||
362 | self._log("Error while fetching remote installation version. Message: '{}'".format(json_data.get('error')), |
||
363 | message2=json_data.get('traceback', []), level='error') |
||
364 | return {} |
||
365 | version_data = res.json() |
||
366 | |||
367 | # Fetch version |
||
368 | url = "{}/unmanic/api/v2/session/state".format(address) |
||
369 | res = request_handler.get(url, timeout=2) |
||
370 | if res.status_code != 200: |
||
371 | if res.status_code in [400, 404, 405, 500]: |
||
372 | json_data = res.json() |
||
373 | self._log( |
||
374 | "Error while fetching remote installation session state. Message: '{}'".format(json_data.get('error')), |
||
375 | message2=json_data.get('traceback', []), level='error') |
||
376 | return {} |
||
377 | session_data = res.json() |
||
378 | |||
379 | # Fetch task count data |
||
380 | data = { |
||
381 | "start": 0, |
||
382 | "length": 1 |
||
383 | } |
||
384 | url = "{}/unmanic/api/v2/pending/tasks".format(address) |
||
385 | res = request_handler.post(url, json=data, timeout=2) |
||
386 | if res.status_code != 200: |
||
387 | if res.status_code in [400, 404, 405, 500]: |
||
388 | json_data = res.json() |
||
389 | self._log( |
||
390 | "Error while fetching remote installation pending task list. Message: '{}'".format(json_data.get('error')), |
||
391 | message2=json_data.get('traceback', []), level='error') |
||
392 | return {} |
||
393 | tasks_data = res.json() |
||
394 | |||
395 | return { |
||
396 | 'system_configuration': system_configuration_data.get('configuration'), |
||
397 | 'settings': settings_data.get('settings'), |
||
398 | 'version': version_data.get('version'), |
||
399 | 'session': { |
||
400 | "level": session_data.get('level'), |
||
401 | "picture_uri": session_data.get('picture_uri'), |
||
402 | "name": session_data.get('name'), |
||
403 | "email": session_data.get('email'), |
||
404 | "uuid": session_data.get('uuid'), |
||
405 | }, |
||
406 | 'task_count': int(tasks_data.get('recordsTotal', 0)) |
||
407 | } |
||
408 | |||
409 | def update_all_remote_installation_links(self): |
||
410 | """ |
||
411 | Updates the link status and configuration of linked remote installations |
||
412 | |||
413 | :return: |
||
414 | """ |
||
415 | save_settings = False |
||
416 | installation_id_list = [] |
||
417 | remote_installations = [] |
||
418 | distributed_worker_count_target = self.settings.get_distributed_worker_count_target() |
||
419 | for local_config in self.settings.get_remote_installations(): |
||
420 | # Ensure address is not added twice by comparing installation IDs |
||
421 | # Items matching these checks will be skipped over and will not be added to the installation list |
||
422 | # that will be re-saved |
||
423 | if local_config.get('uuid') in installation_id_list and local_config.get('uuid', '???') != '???': |
||
424 | # Do not update this installation. By doing this it will be removed from the list |
||
425 | save_settings = True |
||
426 | continue |
||
427 | |||
428 | # Ensure the address is something valid |
||
429 | if not local_config.get('address'): |
||
430 | save_settings = True |
||
431 | continue |
||
432 | |||
433 | # Remove any entries that have an unknown address and uuid |
||
434 | if local_config.get('address') == '???' and local_config.get('uuid') == '???': |
||
435 | save_settings = True |
||
436 | continue |
||
437 | |||
438 | # Fetch updated data |
||
439 | installation_data = None |
||
440 | try: |
||
441 | installation_data = self.validate_remote_installation(local_config.get('address'), |
||
442 | auth=local_config.get('auth'), |
||
443 | username=local_config.get('username'), |
||
444 | password=local_config.get('password')) |
||
445 | except Exception: |
||
446 | pass |
||
447 | |||
448 | # Generate updated configured values |
||
449 | updated_config = self.__generate_default_config(local_config) |
||
450 | updated_config["available"] = False |
||
451 | if installation_data: |
||
452 | # Mark the installation as available |
||
453 | updated_config["available"] = True |
||
454 | |||
455 | # Append the current task count |
||
456 | updated_config["task_count"] = installation_data.get('task_count', 0) |
||
457 | |||
458 | merge_dict = { |
||
459 | "name": installation_data.get('settings', {}).get('installation_name'), |
||
460 | "version": installation_data.get('version'), |
||
461 | "uuid": installation_data.get('session', {}).get('uuid'), |
||
462 | } |
||
463 | self.__merge_config_dicts(updated_config, merge_dict) |
||
464 | |||
465 | # Fetch the corresponding remote configuration for this local installation |
||
466 | remote_config = {} |
||
467 | try: |
||
468 | remote_config = self.fetch_remote_installation_link_config_for_this(local_config) |
||
469 | except requests.exceptions.Timeout: |
||
470 | self._log("Request to fetch remote installation config timed out", level='warning') |
||
471 | updated_config["available"] = False |
||
472 | except requests.exceptions.RequestException as e: |
||
473 | self._log("Request to fetch remote installation config failed", message2=str(e), level='warning') |
||
474 | updated_config["available"] = False |
||
475 | except Exception as e: |
||
476 | self._log("Failed to fetch remote installation config", message2=str(e), level='error') |
||
477 | updated_config["available"] = False |
||
478 | |||
479 | # If the remote configuration is newer than this one, use those values |
||
480 | # The remote installation will do the same and this will synchronise |
||
481 | remote_link_config = remote_config.get('link_config', {}) |
||
482 | if local_config.get('last_updated', 1) < remote_link_config.get('last_updated', 1): |
||
483 | # Note that the configuration options are reversed when reading from the remote installation config |
||
484 | # These items are not synced here: |
||
485 | # - enable_task_preloading |
||
486 | # - enable_checksum_validation |
||
487 | # - enable_config_missing_libraries |
||
488 | if updated_config["enable_receiving_tasks"] != remote_link_config.get('enable_sending_tasks'): |
||
489 | updated_config["enable_receiving_tasks"] = remote_link_config.get('enable_sending_tasks') |
||
490 | save_settings = True |
||
491 | if updated_config["enable_sending_tasks"] != remote_link_config.get('enable_receiving_tasks'): |
||
492 | updated_config["enable_sending_tasks"] = remote_link_config.get('enable_receiving_tasks') |
||
493 | save_settings = True |
||
494 | # Update the distributed_worker_count_target |
||
495 | distributed_worker_count_target = remote_config.get('distributed_worker_count_target', 0) |
||
496 | # Also sync the last_updated flag |
||
497 | updated_config['last_updated'] = remote_link_config.get('last_updated') |
||
498 | |||
499 | # If the remote config is unable to contact this installation (or it does not have a corresponding config yet) |
||
500 | # then also push the configuration |
||
501 | if not remote_link_config.get('available'): |
||
502 | try: |
||
503 | self.push_remote_installation_link_config(updated_config) |
||
504 | except requests.exceptions.Timeout: |
||
505 | self._log("Request to push link config to remote installation timed out", level='warning') |
||
506 | updated_config["available"] = False |
||
507 | except requests.exceptions.RequestException as e: |
||
508 | self._log("Request to push link config to remote installation failed", message2=str(e), |
||
509 | level='warning') |
||
510 | updated_config["available"] = False |
||
511 | except Exception as e: |
||
512 | self._log("Failed to push link config to remote installation", message2=str(e), level='error') |
||
513 | updated_config["available"] = False |
||
514 | |||
515 | # Push library configurations for missing remote libraries (if configured to do so) |
||
516 | if local_config.get('enable_sending_tasks') and local_config.get('enable_config_missing_libraries'): |
||
517 | # Fetch remote installation library name list |
||
518 | results = self.remote_api_get(local_config, '/unmanic/api/v2/settings/libraries') |
||
519 | existing_library_names = [] |
||
520 | for library in results.get('libraries', []): |
||
521 | existing_library_names.append(library.get('name')) |
||
522 | # Loop over local libraries and create an import object for each one that is missing |
||
523 | for library in Library.get_all_libraries(): |
||
524 | # Ignore local libraries that are configured for remote only |
||
525 | if library.get('enable_remote_only'): |
||
526 | continue |
||
527 | # For each of the missing libraries, create a new remote library with that config. |
||
528 | if library.get('name') not in existing_library_names: |
||
529 | # Export library config |
||
530 | import_data = Library.export(library.get('id')) |
||
531 | # Set library ID to 0 to generate new library from this import |
||
532 | import_data['library_id'] = 0 |
||
533 | # Configure remote library to be fore remote files only |
||
534 | import_data['library_config']['enable_remote_only'] = True |
||
535 | import_data['library_config']['enable_scanner'] = False |
||
536 | import_data['library_config']['enable_inotify'] = False |
||
537 | # Import library on remote installation |
||
538 | self._log("Importing remote library config '{}'".format(library.get('name')), message2=import_data, |
||
539 | level='debug') |
||
540 | result = self.import_remote_library_config(local_config, import_data) |
||
541 | if result is None: |
||
542 | # There was a connection issue of some kind. This was already logged. |
||
543 | continue |
||
544 | if result.get('success'): |
||
545 | self._log("Successfully imported library '{}'".format(library.get('name')), level='debug') |
||
546 | continue |
||
547 | self._log("Failed to import library config '{}'".format(library.get('name')), |
||
548 | message2=result.get('error'), level='error') |
||
549 | |||
550 | # Only save to file if the settings have been updated |
||
551 | remote_installations.append(updated_config) |
||
552 | |||
553 | # Add UUID to list for next loop |
||
554 | installation_id_list.append(updated_config.get('uuid', '???')) |
||
555 | |||
556 | # Update installation data. Only save the config to disk if it was modified |
||
557 | settings_dict = { |
||
558 | 'remote_installations': remote_installations, |
||
559 | 'distributed_worker_count_target': distributed_worker_count_target |
||
560 | } |
||
561 | self.settings.set_bulk_config_items(settings_dict, save_settings=save_settings) |
||
562 | |||
563 | return remote_installations |
||
564 | |||
565 | def read_remote_installation_link_config(self, uuid: str): |
||
566 | """ |
||
567 | Returns the configuration of the remote installation |
||
568 | |||
569 | :param uuid: |
||
570 | :return: |
||
571 | """ |
||
572 | for remote_installation in self.settings.get_remote_installations(): |
||
573 | if remote_installation.get('uuid') == uuid: |
||
574 | # If not yet configured, set default values before returning |
||
575 | return self.__generate_default_config(remote_installation) |
||
576 | |||
577 | # Ensure we have settings data from the remote installation |
||
578 | raise Exception("Unable to read installation link configuration.") |
||
579 | |||
580 | def update_single_remote_installation_link_config(self, configuration: dict, distributed_worker_count_target=0): |
||
581 | """ |
||
582 | Returns the configuration of the remote installation |
||
583 | |||
584 | :param configuration: |
||
585 | :param distributed_worker_count_target: |
||
586 | :return: |
||
587 | """ |
||
588 | uuid = configuration.get('uuid') |
||
589 | if not uuid: |
||
590 | raise Exception("Updating a single installation link configuration requires a UUID.") |
||
591 | |||
592 | current_distributed_worker_count_target = self.settings.get_distributed_worker_count_target() |
||
593 | force_update_flag = False |
||
594 | if int(current_distributed_worker_count_target) != int(distributed_worker_count_target): |
||
595 | force_update_flag = True |
||
596 | |||
597 | config_exists = False |
||
598 | remote_installations = [] |
||
599 | for local_config in self.settings.get_remote_installations(): |
||
600 | updated_config = self.__generate_default_config(local_config) |
||
601 | |||
602 | # If this is the uuid in the config provided, then update our config with the provided values |
||
603 | if local_config.get('uuid') == uuid: |
||
604 | config_exists = True |
||
605 | self.__merge_config_dicts(updated_config, configuration) |
||
606 | |||
607 | # If this link is configured for distributed worker count, and that count was change, |
||
608 | # force the last update flag to be updated so this change is disseminated |
||
609 | if force_update_flag and configuration.get('enable_distributed_worker_count'): |
||
610 | updated_config['last_updated'] = time.time() |
||
611 | |||
612 | remote_installations.append(updated_config) |
||
613 | |||
614 | # If the config does not yet exist, the add it now |
||
615 | if not config_exists: |
||
616 | remote_installations.append(self.__generate_default_config(configuration)) |
||
617 | |||
618 | # Update installation data and save the config to disk |
||
619 | settings_dict = { |
||
620 | 'remote_installations': remote_installations, |
||
621 | 'distributed_worker_count_target': distributed_worker_count_target |
||
622 | } |
||
623 | self.settings.set_bulk_config_items(settings_dict, save_settings=True) |
||
624 | |||
625 | def delete_remote_installation_link_config(self, uuid: str): |
||
626 | """ |
||
627 | Removes a link configuration for a remote installation given its uuid |
||
628 | If no uuid match is found, returns False |
||
629 | |||
630 | :param uuid: |
||
631 | :return: |
||
632 | """ |
||
633 | removed = False |
||
634 | updated_list = [] |
||
635 | for remote_installation in self.settings.get_remote_installations(): |
||
636 | if remote_installation.get('uuid') == uuid: |
||
637 | # Mark the task as having successfully remoted the installation |
||
638 | removed = True |
||
639 | continue |
||
640 | # Only add remote installations that do not match |
||
641 | updated_list.append(remote_installation) |
||
642 | |||
643 | # Update installation data and save the config to disk |
||
644 | settings_dict = { |
||
645 | 'remote_installations': updated_list, |
||
646 | } |
||
647 | self.settings.set_bulk_config_items(settings_dict, save_settings=True) |
||
648 | return removed |
||
649 | |||
650 | def fetch_remote_installation_link_config_for_this(self, remote_config: dict): |
||
651 | """ |
||
652 | Fetches and returns the corresponding link configuration from a remote installation |
||
653 | |||
654 | :param remote_config: |
||
655 | :return: |
||
656 | """ |
||
657 | request_handler = RequestHandler( |
||
658 | auth=remote_config.get('auth'), |
||
659 | username=remote_config.get('username'), |
||
660 | password=remote_config.get('password'), |
||
661 | ) |
||
662 | address = self.__format_address(remote_config.get('address')) |
||
663 | url = "{}/unmanic/api/v2/settings/link/read".format(address) |
||
664 | data = { |
||
665 | "uuid": self.session.uuid |
||
666 | } |
||
667 | res = request_handler.post(url, json=data, timeout=2) |
||
668 | if res.status_code == 200: |
||
669 | return res.json() |
||
670 | elif res.status_code in [400, 404, 405, 500]: |
||
671 | json_data = res.json() |
||
672 | self._log("Error while fetching remote installation link config. Message: '{}'".format(json_data.get('error')), |
||
673 | message2=json_data.get('traceback', []), level='error') |
||
674 | return {} |
||
675 | |||
676 | def push_remote_installation_link_config(self, configuration: dict): |
||
677 | """ |
||
678 | Pushes the given link config to the remote installation returns the corresponding link configuration from a remote installation |
||
679 | |||
680 | :param configuration: |
||
681 | :return: |
||
682 | """ |
||
683 | request_handler = RequestHandler( |
||
684 | auth=configuration.get('auth'), |
||
685 | username=configuration.get('username'), |
||
686 | password=configuration.get('password'), |
||
687 | ) |
||
688 | address = self.__format_address(configuration.get('address')) |
||
689 | url = "{}/unmanic/api/v2/settings/link/write".format(address) |
||
690 | |||
691 | # First generate an updated config |
||
692 | updated_config = self.__generate_default_config(configuration) |
||
693 | |||
694 | # Update the bits for the remote instance |
||
695 | updated_config['uuid'] = self.session.uuid |
||
696 | updated_config['name'] = self.settings.get_installation_name() |
||
697 | updated_config['version'] = self.settings.read_version() |
||
698 | |||
699 | # Configure settings |
||
700 | updated_config["enable_receiving_tasks"] = configuration.get('enable_sending_tasks') |
||
701 | updated_config["enable_sending_tasks"] = configuration.get('enable_receiving_tasks') |
||
702 | |||
703 | # Current task count |
||
704 | task_handler = task.Task() |
||
705 | updated_config["task_count"] = int(task_handler.get_total_task_list_count()) |
||
706 | |||
707 | # Fetch local config for distributed_worker_count_target |
||
708 | distributed_worker_count_target = self.settings.get_distributed_worker_count_target() |
||
709 | |||
710 | # Remove some of the other fields. These will need to be adjusted on the remote instance manually |
||
711 | del updated_config['address'] |
||
712 | del updated_config['available'] |
||
713 | |||
714 | data = { |
||
715 | 'link_config': updated_config, |
||
716 | 'distributed_worker_count_target': distributed_worker_count_target |
||
717 | } |
||
718 | res = request_handler.post(url, json=data, timeout=2) |
||
719 | if res.status_code == 200: |
||
720 | return True |
||
721 | elif res.status_code in [400, 404, 405, 500]: |
||
722 | json_data = res.json() |
||
723 | self._log("Error while pushing remote installation link config. Message: '{}'".format(json_data.get('error')), |
||
724 | message2=json_data.get('traceback', []), level='error') |
||
725 | return False |
||
726 | |||
727 | def check_remote_installation_for_available_workers(self): |
||
728 | """ |
||
729 | Return a list of installations with workers available for a remote task. |
||
730 | This list is filtered by: |
||
731 | - Only installations that are available |
||
732 | - Only installations that are configured for sending tasks to |
||
733 | - Only installations that have not pending tasks |
||
734 | - Only installations that have at least one idle worker that is not paused |
||
735 | |||
736 | :return: |
||
737 | """ |
||
738 | installations_with_info = {} |
||
739 | for lc in self.settings.get_remote_installations(): |
||
740 | local_config = self.__generate_default_config(lc) |
||
741 | |||
742 | # Only installations that are available |
||
743 | if not local_config.get('available'): |
||
744 | continue |
||
745 | |||
746 | # Only installations that are configured for sending tasks to |
||
747 | if not local_config.get('enable_sending_tasks'): |
||
748 | continue |
||
749 | |||
750 | # No valid UUID, no valid connection. This link may still be syncing |
||
751 | if len(local_config.get('uuid', '')) < 20: |
||
752 | continue |
||
753 | |||
754 | try: |
||
755 | # Define auth |
||
756 | # Only installations that have at least one idle worker that is not paused |
||
757 | results = self.remote_api_get(local_config, '/unmanic/api/v2/workers/status') |
||
758 | worker_list = results.get('workers_status', []) |
||
759 | |||
760 | # Only add installations that have not got pending tasks. This is unless we are configured to preload the queue |
||
761 | max_pending_tasks = 0 |
||
762 | if local_config.get('enable_task_preloading'): |
||
763 | # Preload with the number of workers (regardless of the worker status) plus an additional one to account |
||
764 | # for delays in the downloads |
||
765 | max_pending_tasks = local_config.get('preloading_count') |
||
766 | results = self.remote_api_post(local_config, '/unmanic/api/v2/pending/tasks', { |
||
767 | "start": 0, |
||
768 | "length": 1 |
||
769 | }) |
||
770 | if results.get('error'): |
||
771 | continue |
||
772 | current_pending_tasks = int(results.get('recordsFiltered', 0)) |
||
773 | if local_config.get('enable_task_preloading') and current_pending_tasks >= max_pending_tasks: |
||
774 | self._log("Remote installation has exceeded the max remote pending task count ({})".format( |
||
775 | current_pending_tasks), level='debug') |
||
776 | continue |
||
777 | |||
778 | # Fetch remote installation library name list |
||
779 | results = self.remote_api_get(local_config, '/unmanic/api/v2/settings/libraries') |
||
780 | library_names = [] |
||
781 | for library in results.get('libraries', []): |
||
782 | library_names.append(library.get('name')) |
||
783 | |||
784 | # Ensure that worker count is more than 0 |
||
785 | if len(worker_list): |
||
786 | installations_with_info[local_config.get('uuid')] = { |
||
787 | "address": local_config.get('address'), |
||
788 | "auth": local_config.get('auth'), |
||
789 | "username": local_config.get('username'), |
||
790 | "password": local_config.get('password'), |
||
791 | "enable_task_preloading": local_config.get('enable_task_preloading'), |
||
792 | "preloading_count": local_config.get('preloading_count'), |
||
793 | "library_names": library_names, |
||
794 | "available_slots": 0, |
||
795 | } |
||
796 | |||
797 | available_workers = False |
||
798 | for worker in worker_list: |
||
799 | # Add a slot for each worker regardless of its status |
||
800 | installations_with_info[local_config.get('uuid')]['available_slots'] += 1 |
||
801 | if worker.get('idle') and not worker.get('paused'): |
||
802 | # If any workers are idle and not paused then we have an available worker slot |
||
803 | available_workers = True |
||
804 | installations_with_info[local_config.get('uuid')]['available_workers'] = True |
||
805 | elif not worker.get('idle'): |
||
806 | # If any workers are busy with a task then also mark that as an an available worker slot |
||
807 | available_workers = True |
||
808 | installations_with_info[local_config.get('uuid')]['available_workers'] = True |
||
809 | |||
810 | # Check if this installation is configured for preloading |
||
811 | if available_workers and local_config.get('enable_task_preloading'): |
||
812 | # Add more slots to fill up the pending task queue |
||
813 | while not current_pending_tasks > max_pending_tasks: |
||
814 | installations_with_info[local_config.get('uuid')]['available_slots'] += 1 |
||
815 | current_pending_tasks += 1 |
||
816 | |||
817 | except Exception as e: |
||
818 | self._log("Failed to contact remote installation '{}'".format(local_config.get('address')), message2=str(e), |
||
819 | level='warning') |
||
820 | continue |
||
821 | |||
822 | return installations_with_info |
||
823 | |||
824 | def within_enabled_link_limits(self, frontend_messages=None): |
||
825 | """ |
||
826 | Ensure enabled plugins are within limits |
||
827 | |||
828 | :param frontend_messages: |
||
829 | :return: |
||
830 | """ |
||
831 | # Fetch level from session |
||
832 | s = Session() |
||
833 | s.register_unmanic() |
||
834 | if s.level > 1: |
||
835 | return True |
||
836 | |||
837 | # Fetch all linked remote installations |
||
838 | remote_installations = self.settings.get_remote_installations() |
||
839 | |||
840 | def add_frontend_message(): |
||
841 | # If the frontend messages queue was included in request, append a message |
||
842 | if frontend_messages: |
||
843 | frontend_messages.put( |
||
844 | { |
||
845 | 'id': 'linkedInstallationLimits', |
||
846 | 'type': 'error', |
||
847 | 'code': 'linkedInstallationLimits', |
||
848 | 'message': '', |
||
849 | 'timeout': 0 |
||
850 | } |
||
851 | ) |
||
852 | |||
853 | # Ensure remote installations are within limits |
||
854 | # Function was returned above if the user was logged in and able to use infinite |
||
855 | if len(remote_installations) > s.link_count: |
||
856 | add_frontend_message() |
||
857 | return False |
||
858 | return True |
||
859 | |||
860 | def new_pending_task_create_on_remote_installation(self, remote_config: dict, abspath: str, library_id: int): |
||
861 | """ |
||
862 | Create a new pending task on a remote installation. |
||
863 | The remote installation will return the ID of a generated task. |
||
864 | |||
865 | :param remote_config: |
||
866 | :param abspath: |
||
867 | :param library_id: |
||
868 | :return: |
||
869 | """ |
||
870 | try: |
||
871 | request_handler = RequestHandler( |
||
872 | auth=remote_config.get('auth'), |
||
873 | username=remote_config.get('username'), |
||
874 | password=remote_config.get('password'), |
||
875 | ) |
||
876 | address = self.__format_address(remote_config.get('address')) |
||
877 | url = "{}/unmanic/api/v2/pending/create".format(address) |
||
878 | data = { |
||
879 | "path": abspath, |
||
880 | "library_id": library_id, |
||
881 | "type": 'remote', |
||
882 | } |
||
883 | res = request_handler.post(url, json=data, timeout=2) |
||
884 | if res.status_code in [200, 400]: |
||
885 | return res.json() |
||
886 | elif res.status_code in [404, 405, 500]: |
||
887 | json_data = res.json() |
||
888 | self._log("Error while creating new remote pending task. Message: '{}'".format(json_data.get('error')), |
||
889 | message2=json_data.get('traceback', []), level='error') |
||
890 | return {} |
||
891 | except requests.exceptions.Timeout: |
||
892 | self._log("Request to create remote pending task timed out '{}'".format(abspath), level='warning') |
||
893 | return None |
||
894 | except requests.exceptions.RequestException as e: |
||
895 | self._log("Request to create remote pending task failed '{}'".format(abspath), message2=str(e), level='warning') |
||
896 | return None |
||
897 | except Exception as e: |
||
898 | self._log("Failed to create remote pending task '{}'".format(abspath), message2=str(e), level='error') |
||
899 | return {} |
||
900 | |||
901 | def send_file_to_remote_installation(self, remote_config: dict, path: str): |
||
902 | """ |
||
903 | Send a file to a remote installation. |
||
904 | The remote installation will return the ID of a generated task. |
||
905 | |||
906 | :param remote_config: |
||
907 | :param path: |
||
908 | :return: |
||
909 | """ |
||
910 | try: |
||
911 | results = self.remote_api_post_file(remote_config, '/unmanic/api/v2/upload/pending/file', path) |
||
912 | if results.get('error'): |
||
913 | results = {} |
||
914 | return results |
||
915 | except requests.exceptions.RequestException as e: |
||
916 | self._log("Request to upload to remote installation failed", message2=str(e), level='warning') |
||
917 | except Exception as e: |
||
918 | self._log("Failed to upload to remote installation", message2=str(e), level='error') |
||
919 | return {} |
||
920 | |||
921 | def remove_task_from_remote_installation(self, remote_config: dict, remote_task_id: int): |
||
922 | """ |
||
923 | Remove a task from the pending queue |
||
924 | |||
925 | :param remote_config: |
||
926 | :param remote_task_id: |
||
927 | :return: |
||
928 | """ |
||
929 | try: |
||
930 | data = { |
||
931 | "id_list": [remote_task_id] |
||
932 | } |
||
933 | return self.remote_api_delete(remote_config, '/unmanic/api/v2/pending/tasks', data, timeout=15) |
||
934 | except requests.exceptions.Timeout: |
||
935 | self._log("Request to remove remote task timed out", level='warning') |
||
936 | return None |
||
937 | except requests.exceptions.RequestException as e: |
||
938 | self._log("Request to remove remote task failed", message2=str(e), level='warning') |
||
939 | return None |
||
940 | except Exception as e: |
||
941 | self._log("Failed to remove remote pending task", message2=str(e), level='error') |
||
942 | return {} |
||
943 | |||
944 | def get_the_remote_library_config_by_name(self, remote_config: dict, library_name: str): |
||
945 | """ |
||
946 | Fetch a remote library's configuration by its name |
||
947 | |||
948 | :param remote_config: |
||
949 | :param library_name: |
||
950 | :return: |
||
951 | """ |
||
952 | try: |
||
953 | # Fetch remote installation libraries |
||
954 | results = self.remote_api_get(remote_config, '/unmanic/api/v2/settings/libraries', timeout=4) |
||
955 | for library in results.get('libraries', []): |
||
956 | if library.get('name') == library_name: |
||
957 | return library |
||
958 | except requests.exceptions.Timeout: |
||
959 | self._log("Request to set remote task library timed out", level='warning') |
||
960 | return None |
||
961 | except requests.exceptions.RequestException as e: |
||
962 | self._log("Request to set remote task library failed", message2=str(e), level='warning') |
||
963 | return None |
||
964 | except Exception as e: |
||
965 | self._log("Failed to set remote task library", message2=str(e), level='error') |
||
966 | return {} |
||
967 | |||
968 | def set_the_remote_task_library(self, remote_config: dict, remote_task_id: int, library_name: str): |
||
969 | """ |
||
970 | Set the library for the remote task |
||
971 | Defaults to the remote installation's default library |
||
972 | |||
973 | :param remote_config: |
||
974 | :param remote_task_id: |
||
975 | :param library_name: |
||
976 | :return: |
||
977 | """ |
||
978 | try: |
||
979 | data = { |
||
980 | "id_list": [remote_task_id], |
||
981 | "library_name": library_name, |
||
982 | } |
||
983 | results = self.remote_api_post(remote_config, '/unmanic/api/v2/pending/library/update', data, timeout=7) |
||
984 | if results.get('error'): |
||
985 | results = {} |
||
986 | return results |
||
987 | except requests.exceptions.Timeout: |
||
988 | self._log("Request to set remote task library timed out", level='warning') |
||
989 | return None |
||
990 | except requests.exceptions.RequestException as e: |
||
991 | self._log("Request to set remote task library failed", message2=str(e), level='warning') |
||
992 | return None |
||
993 | except Exception as e: |
||
994 | self._log("Failed to set remote task library", message2=str(e), level='error') |
||
995 | return {} |
||
996 | |||
997 | def get_remote_pending_task_state(self, remote_config: dict, remote_task_id: int): |
||
998 | """ |
||
999 | Get the remote pending task status |
||
1000 | |||
1001 | :param remote_config: |
||
1002 | :param remote_task_id: |
||
1003 | :return: |
||
1004 | """ |
||
1005 | try: |
||
1006 | data = { |
||
1007 | "id_list": [remote_task_id] |
||
1008 | } |
||
1009 | results = self.remote_api_post(remote_config, '/unmanic/api/v2/pending/status/get', data, timeout=7) |
||
1010 | return results |
||
1011 | except requests.exceptions.Timeout: |
||
1012 | self._log("Request to get status of remote task timed out", level='warning') |
||
1013 | except requests.exceptions.RequestException as e: |
||
1014 | self._log("Request to get status of remote task failed", message2=str(e), level='warning') |
||
1015 | except Exception as e: |
||
1016 | self._log("Failed to get status of remote pending task", message2=str(e), level='error') |
||
1017 | return None |
||
1018 | |||
1019 | def start_the_remote_task_by_id(self, remote_config: dict, remote_task_id: int): |
||
1020 | """ |
||
1021 | Start the remote pending task |
||
1022 | |||
1023 | :param remote_config: |
||
1024 | :param remote_task_id: |
||
1025 | :return: |
||
1026 | """ |
||
1027 | try: |
||
1028 | data = { |
||
1029 | "id_list": [remote_task_id] |
||
1030 | } |
||
1031 | results = self.remote_api_post(remote_config, '/unmanic/api/v2/pending/status/set/ready', data, timeout=7) |
||
1032 | if results.get('error'): |
||
1033 | results = {} |
||
1034 | return results |
||
1035 | except requests.exceptions.Timeout: |
||
1036 | self._log("Request to start remote task timed out", level='warning') |
||
1037 | return None |
||
1038 | except requests.exceptions.RequestException as e: |
||
1039 | self._log("Request to start remote task failed", message2=str(e), level='warning') |
||
1040 | return None |
||
1041 | except Exception as e: |
||
1042 | self._log("Failed to start remote pending task", message2=str(e), level='error') |
||
1043 | return {} |
||
1044 | |||
1045 | def get_all_worker_status(self, remote_config: dict): |
||
1046 | """ |
||
1047 | Start the remote pending task |
||
1048 | |||
1049 | :param remote_config: |
||
1050 | :return: |
||
1051 | """ |
||
1052 | try: |
||
1053 | results = self.remote_api_get(remote_config, '/unmanic/api/v2/workers/status') |
||
1054 | return results.get('workers_status', []) |
||
1055 | except requests.exceptions.Timeout: |
||
1056 | self._log("Request to get worker status timed out", level='warning') |
||
1057 | except requests.exceptions.RequestException as e: |
||
1058 | self._log("Request to get worker status failed", message2=str(e), level='warning') |
||
1059 | except Exception as e: |
||
1060 | self._log("Failed to get worker status", message2=str(e), level='error') |
||
1061 | return [] |
||
1062 | |||
1063 | def get_single_worker_status(self, remote_config: dict, worker_id: str): |
||
1064 | """ |
||
1065 | Start the remote pending task |
||
1066 | |||
1067 | :param remote_config: |
||
1068 | :param worker_id: |
||
1069 | :return: |
||
1070 | """ |
||
1071 | workers_status = self.get_all_worker_status(remote_config) |
||
1072 | for worker in workers_status: |
||
1073 | if worker.get('id') == worker_id: |
||
1074 | return worker |
||
1075 | return {} |
||
1076 | |||
1077 | def terminate_remote_worker(self, remote_config: dict, worker_id: str): |
||
1078 | """ |
||
1079 | Start the remote pending task |
||
1080 | |||
1081 | :param remote_config: |
||
1082 | :param worker_id: |
||
1083 | :return: |
||
1084 | """ |
||
1085 | try: |
||
1086 | data = { |
||
1087 | "worker_id": [worker_id] |
||
1088 | } |
||
1089 | return self.remote_api_delete(remote_config, '/unmanic/api/v2/workers/worker/terminate', data) |
||
1090 | except requests.exceptions.Timeout: |
||
1091 | self._log("Request to terminate remote worker timed out", level='warning') |
||
1092 | except requests.exceptions.RequestException as e: |
||
1093 | self._log("Request to terminate remote worker failed", message2=str(e), level='warning') |
||
1094 | except Exception as e: |
||
1095 | self._log("Failed to terminate remote worker", message2=str(e), level='error') |
||
1096 | return {} |
||
1097 | |||
1098 | def fetch_remote_task_data(self, remote_config: dict, remote_task_id: int, path: str): |
||
1099 | """ |
||
1100 | Fetch the completed remote task data |
||
1101 | |||
1102 | :param remote_config: |
||
1103 | :param remote_task_id: |
||
1104 | :param path: |
||
1105 | :return: |
||
1106 | """ |
||
1107 | task_data = {} |
||
1108 | try: |
||
1109 | # Request API generate a DL link |
||
1110 | link_info = self.remote_api_get(remote_config, |
||
1111 | '/unmanic/api/v2/pending/download/data/id/{}'.format(remote_task_id)) |
||
1112 | if link_info.get('link_id'): |
||
1113 | # Download the data file |
||
1114 | res = self.remote_api_get_download(remote_config, '/unmanic/downloads/{}'.format(link_info.get('link_id')), |
||
1115 | path) |
||
1116 | if res and os.path.exists(path): |
||
1117 | with open(path) as f: |
||
1118 | task_data = json.load(f) |
||
1119 | except requests.exceptions.Timeout: |
||
1120 | self._log("Request to fetch remote task data timed out", level='warning') |
||
1121 | except requests.exceptions.RequestException as e: |
||
1122 | self._log("Request to fetch remote task data failed", message2=str(e), level='warning') |
||
1123 | except Exception as e: |
||
1124 | self._log("Failed to fetch remote task data", message2=str(e), level='error') |
||
1125 | return task_data |
||
1126 | |||
1127 | def fetch_remote_task_completed_file(self, remote_config: dict, remote_task_id: int, path: str): |
||
1128 | """ |
||
1129 | Fetch the completed remote task file |
||
1130 | |||
1131 | :param remote_config: |
||
1132 | :param remote_task_id: |
||
1133 | :param path: |
||
1134 | :return: |
||
1135 | """ |
||
1136 | try: |
||
1137 | # Request API generate a DL link |
||
1138 | link_info = self.remote_api_get(remote_config, |
||
1139 | '/unmanic/api/v2/pending/download/file/id/{}'.format(remote_task_id)) |
||
1140 | if link_info.get('link_id'): |
||
1141 | # Download the file |
||
1142 | res = self.remote_api_get_download(remote_config, '/unmanic/downloads/{}'.format(link_info.get('link_id')), |
||
1143 | path) |
||
1144 | if res and os.path.exists(path): |
||
1145 | return True |
||
1146 | except requests.exceptions.Timeout: |
||
1147 | self._log("Request to fetch remote task completed file timed out", level='warning') |
||
1148 | except requests.exceptions.RequestException as e: |
||
1149 | self._log("Request to fetch remote task completed file failed", message2=str(e), level='warning') |
||
1150 | except Exception as e: |
||
1151 | self._log("Failed to fetch remote task completed file", message2=str(e), level='error') |
||
1152 | return False |
||
1153 | |||
1154 | def import_remote_library_config(self, remote_config: dict, import_data: dict): |
||
1155 | """ |
||
1156 | Import a library config on a remote installation |
||
1157 | |||
1158 | :param remote_config: |
||
1159 | :param import_data: |
||
1160 | :return: |
||
1161 | """ |
||
1162 | try: |
||
1163 | results = self.remote_api_post(remote_config, '/unmanic/api/v2/settings/library/import', import_data, timeout=60) |
||
1164 | if results.get('error'): |
||
1165 | results = {} |
||
1166 | return results |
||
1167 | except requests.exceptions.Timeout: |
||
1168 | self._log("Request to import remote library timed out", level='warning') |
||
1169 | return None |
||
1170 | except requests.exceptions.RequestException as e: |
||
1171 | self._log("Request to import remote library failed", message2=str(e), level='warning') |
||
1172 | return None |
||
1173 | except Exception as e: |
||
1174 | self._log("Failed to import remote library", message2=str(e), level='error') |
||
1175 | return {} |
||
1176 | |||
1177 | |||
1178 | class RemoteTaskManager(threading.Thread): |
||
1179 | paused = False |
||
1180 | |||
1181 | current_task = None |
||
1182 | worker_log = None |
||
1183 | start_time = None |
||
1184 | finish_time = None |
||
1185 | |||
1186 | worker_subprocess_percent = None |
||
1187 | worker_subprocess_elapsed = None |
||
1188 | |||
1189 | worker_runners_info = {} |
||
1190 | |||
1191 | def __init__(self, thread_id, name, installation_info, pending_queue, complete_queue, event): |
||
1192 | super(RemoteTaskManager, self).__init__(name=name) |
||
1193 | self.thread_id = thread_id |
||
1194 | self.name = name |
||
1195 | self.event = event |
||
1196 | self.installation_info = installation_info |
||
1197 | self.pending_queue = pending_queue |
||
1198 | self.complete_queue = complete_queue |
||
1199 | |||
1200 | self.links = Links() |
||
1201 | |||
1202 | # Create 'redundancy' flag. When this is set, the worker should die |
||
1203 | self.redundant_flag = threading.Event() |
||
1204 | self.redundant_flag.clear() |
||
1205 | |||
1206 | # Create 'paused' flag. When this is set, the worker should be paused |
||
1207 | self.paused_flag = threading.Event() |
||
1208 | self.paused_flag.clear() |
||
1209 | |||
1210 | # Create logger for this worker |
||
1211 | unmanic_logging = unlogger.UnmanicLogger.__call__() |
||
1212 | self.logger = unmanic_logging.get_logger(self.name) |
||
1213 | |||
1214 | def _log(self, message, message2='', level="info"): |
||
1215 | message = common.format_message(message, message2) |
||
1216 | getattr(self.logger, level)(message) |
||
1217 | |||
1218 | def get_info(self): |
||
1219 | return { |
||
1220 | 'name': self.name, |
||
1221 | 'installation_info': self.installation_info, |
||
1222 | } |
||
1223 | |||
1224 | def run(self): |
||
1225 | # A manager should only run for a single task and connection to a single worker. |
||
1226 | # If either of these become unavailable, then the manager should exit |
||
1227 | self._log("Starting remote task manager {} - {}".format(self.thread_id, self.installation_info.get('address'))) |
||
1228 | # Pull task |
||
1229 | try: |
||
1230 | # Pending task queue has an item available. Fetch it. |
||
1231 | next_task = self.pending_queue.get_nowait() |
||
1232 | |||
1233 | # Configure worker for this task |
||
1234 | self.__set_current_task(next_task) |
||
1235 | |||
1236 | # Process the set task |
||
1237 | self.__process_task_queue_item() |
||
1238 | |||
1239 | except queue.Empty: |
||
1240 | self._log("Remote task manager started by the pending queue was empty", level="warning") |
||
1241 | except Exception as e: |
||
1242 | self._log("Exception in processing job with {}:".format(self.name), message2=str(e), |
||
1243 | level="exception") |
||
1244 | |||
1245 | self._log("Stopping remote task manager {} - {}".format(self.thread_id, self.installation_info.get('address'))) |
||
1246 | |||
1247 | def __set_current_task(self, current_task): |
||
1248 | """Sets the given task to the worker class""" |
||
1249 | self.current_task = current_task |
||
1250 | self.worker_log = [] |
||
1251 | |||
1252 | def __unset_current_task(self): |
||
1253 | self.current_task = None |
||
1254 | self.worker_runners_info = {} |
||
1255 | self.worker_log = [] |
||
1256 | |||
1257 | def __process_task_queue_item(self): |
||
1258 | """ |
||
1259 | Processes the set task. |
||
1260 | |||
1261 | :return: |
||
1262 | """ |
||
1263 | # Set the progress to an empty string |
||
1264 | self.worker_subprocess_percent = '' |
||
1265 | self.worker_subprocess_elapsed = '0' |
||
1266 | |||
1267 | # Log the start of the job |
||
1268 | self._log("Picked up job - {}".format(self.current_task.get_source_abspath())) |
||
1269 | |||
1270 | # Mark as being "in progress" |
||
1271 | self.current_task.set_status('in_progress') |
||
1272 | |||
1273 | # Start current task stats |
||
1274 | self.__set_start_task_stats() |
||
1275 | |||
1276 | # Process the file. Will return true if success, otherwise false |
||
1277 | success = self.__send_task_to_remote_worker_and_monitor() |
||
1278 | # Mark the task as either success or not |
||
1279 | self.current_task.set_success(success) |
||
1280 | |||
1281 | # Mark task completion statistics |
||
1282 | self.__set_finish_task_stats() |
||
1283 | |||
1284 | # Log completion of job |
||
1285 | self._log("Finished job - {}".format(self.current_task.get_source_abspath())) |
||
1286 | |||
1287 | # Place the task into the completed queue |
||
1288 | self.complete_queue.put(self.current_task) |
||
1289 | |||
1290 | # Reset the current file info for the next task |
||
1291 | self.__unset_current_task() |
||
1292 | |||
1293 | def __set_start_task_stats(self): |
||
1294 | """Sets the initial stats for the start of a task""" |
||
1295 | # Set the start time to now |
||
1296 | self.start_time = time.time() |
||
1297 | |||
1298 | # Clear the finish time |
||
1299 | self.finish_time = None |
||
1300 | |||
1301 | # Format our starting statistics data |
||
1302 | self.current_task.task.processed_by_worker = self.name |
||
1303 | self.current_task.task.start_time = self.start_time |
||
1304 | self.current_task.task.finish_time = self.finish_time |
||
1305 | |||
1306 | def __set_finish_task_stats(self): |
||
1307 | """Sets the final stats for the end of a task""" |
||
1308 | # Set the finish time to now |
||
1309 | self.finish_time = time.time() |
||
1310 | |||
1311 | # Set the finish time in the statistics data |
||
1312 | self.current_task.task.finish_time = self.finish_time |
||
1313 | |||
1314 | def __write_failure_to_worker_log(self): |
||
1315 | # Append long entry to say the worker was terminated |
||
1316 | self.worker_log.append("\n\nREMOTE TASK FAILED!") |
||
1317 | self.worker_log.append("\nAn error occurred during one of these stages:") |
||
1318 | self.worker_log.append("\n - while sending task to remote installation") |
||
1319 | self.worker_log.append("\n - during the remote task processing") |
||
1320 | self.worker_log.append("\n - while attempting to retrieve the completed task from the remote installation") |
||
1321 | self.worker_log.append("\nCheck Unmanic logs for more information.") |
||
1322 | self.worker_log.append("\nRelevant logs will be prefixed with 'ERROR:Unmanic.{}'".format(self.name)) |
||
1323 | self.current_task.save_command_log(self.worker_log) |
||
1324 | |||
1325 | def __send_task_to_remote_worker_and_monitor(self): |
||
1326 | """ |
||
1327 | Sends the task file to the remote installation to process. |
||
1328 | Monitors progress and then fetches the results |
||
1329 | |||
1330 | TODO: Manage network disconnections. |
||
1331 | - This manager object should be able to handle a network disconnect. However, we should terminate |
||
1332 | this manager if the remote task no longer exists. |
||
1333 | - Catch all API request exceptions. |
||
1334 | - Remove the failed_status_count - losing contact should be ok. What matters is when contact is made that |
||
1335 | the task still exists to be downloaded or status updated. |
||
1336 | |||
1337 | :return: |
||
1338 | """ |
||
1339 | # Set the absolute path to the original file |
||
1340 | original_abspath = self.current_task.get_source_abspath() |
||
1341 | |||
1342 | # Ensure file exists |
||
1343 | if not os.path.exists(original_abspath): |
||
1344 | self._log("File no longer exists '{}'. Was it removed?".format(original_abspath), level='warning') |
||
1345 | self.__write_failure_to_worker_log() |
||
1346 | return False |
||
1347 | |||
1348 | # Set the remote worker address |
||
1349 | address = self.installation_info.get('address') |
||
1350 | |||
1351 | lock_key = None |
||
1352 | |||
1353 | # Fetch the library name and path this task is for |
||
1354 | library_id = self.current_task.get_task_library_id() |
||
1355 | try: |
||
1356 | library = Library(library_id) |
||
1357 | except Exception as e: |
||
1358 | self._log("Unable to fetch library config for ID {}".format(library_id), level='exception') |
||
1359 | self.__write_failure_to_worker_log() |
||
1360 | return False |
||
1361 | library_name = library.get_name() |
||
1362 | library_path = library.get_path() |
||
1363 | |||
1364 | # Check if we can create the remote task with just a relative path |
||
1365 | # only create checksum and send file if the remote library path cannot accept relative paths or |
||
1366 | # it is configured for only receiving remote files |
||
1367 | send_file = False |
||
1368 | library_config = self.links.get_the_remote_library_config_by_name(self.installation_info, library_name) |
||
1369 | |||
1370 | # Check if remote library is configured only for receiving remote files |
||
1371 | if library_config.get('enable_remote_only'): |
||
1372 | send_file = True |
||
1373 | |||
1374 | # First attempt to create a task with an abspath on the remote installation |
||
1375 | remote_task_id = None |
||
1376 | if not send_file: |
||
1377 | remote_library_id = library_config.get('id') |
||
1378 | |||
1379 | # Remove library path from file abspath to create a relative path |
||
1380 | original_relpath = os.path.relpath(original_abspath, library_path) |
||
1381 | # Join remote library path to the relative path to form a remote library abspath to the file |
||
1382 | remote_original_abspath = os.path.join(library_config.get('path'), original_relpath) |
||
1383 | # Post the task creation. This will error if the file does not exist |
||
1384 | info = self.links.new_pending_task_create_on_remote_installation(self.installation_info, |
||
1385 | remote_original_abspath, |
||
1386 | remote_library_id) |
||
1387 | if not info: |
||
1388 | self._log("Unable to create remote pending task for path '{}'. Fallback to sending file.".format( |
||
1389 | remote_original_abspath), level='debug') |
||
1390 | send_file = True |
||
1391 | elif 'path does not exist' in info.get('error', '').lower(): |
||
1392 | self._log("Unable to find file in remote library's path '{}'. Fallback to sending file.".format( |
||
1393 | remote_original_abspath), level='debug') |
||
1394 | send_file = True |
||
1395 | elif 'task already exists' in info.get('error', '').lower(): |
||
1396 | self._log("A remote task already exists with the path '{}'. Fallback to sending file.".format( |
||
1397 | remote_original_abspath), level='error') |
||
1398 | self.__write_failure_to_worker_log() |
||
1399 | return False |
||
1400 | |||
1401 | # Set the remote task ID |
||
1402 | remote_task_id = info.get('id') |
||
1403 | |||
1404 | if send_file: |
||
1405 | initial_checksum = None |
||
1406 | if self.installation_info.get('enable_checksum_validation', False): |
||
1407 | # Get source file checksum |
||
1408 | initial_checksum = common.get_file_checksum(original_abspath) |
||
1409 | initial_file_size = os.path.getsize(original_abspath) |
||
1410 | |||
1411 | # Loop until we are able to upload the file to the remote installation |
||
1412 | info = {} |
||
1413 | while not self.redundant_flag.is_set(): |
||
1414 | # For files smaller than 100MB, just transfer them in parallel |
||
1415 | # Smaller files add a lot of time overhead with the waiting in line and it slows the whole process down |
||
1416 | # Larger files benefit from being transferred one at a time. |
||
1417 | if initial_file_size > 100000000: |
||
1418 | # Check for network transfer lock |
||
1419 | lock_key = self.links.acquire_network_transfer_lock(address, transfer_limit=1, lock_type='send') |
||
1420 | if not lock_key: |
||
1421 | self.event.wait(1) |
||
1422 | continue |
||
1423 | |||
1424 | # Send a file to a remote installation. |
||
1425 | self._log("Uploading file to remote installation '{}'".format(original_abspath), level='debug') |
||
1426 | info = self.links.send_file_to_remote_installation(self.installation_info, original_abspath) |
||
1427 | self.links.release_network_transfer_lock(lock_key) |
||
1428 | if not info: |
||
1429 | self._log("Failed to upload the file '{}'".format(original_abspath), level='error') |
||
1430 | self.__write_failure_to_worker_log() |
||
1431 | return False |
||
1432 | break |
||
1433 | |||
1434 | # Set the remote task ID |
||
1435 | remote_task_id = info.get('id') |
||
1436 | |||
1437 | # Compare uploaded file md5checksum |
||
1438 | if initial_checksum and info.get('checksum') != initial_checksum: |
||
1439 | self._log("The uploaded file did not return a correct checksum '{}'".format(original_abspath), level='error') |
||
1440 | # Send request to terminate the remote worker then return |
||
1441 | self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id) |
||
1442 | self.__write_failure_to_worker_log() |
||
1443 | return False |
||
1444 | |||
1445 | # Ensure at this point we have set the remote_task_id |
||
1446 | if remote_task_id is None: |
||
1447 | self._log("Failed to create remote task. Var remote_task_id is still None", level='error') |
||
1448 | self.__write_failure_to_worker_log() |
||
1449 | return False |
||
1450 | |||
1451 | # Set the library of the remote task using the library's name |
||
1452 | while not self.redundant_flag.is_set(): |
||
1453 | result = self.links.set_the_remote_task_library(self.installation_info, remote_task_id, library_name) |
||
1454 | if result is None: |
||
1455 | # Unable to reach remote installation |
||
1456 | self.event.wait(2) |
||
1457 | continue |
||
1458 | if not result.get('success'): |
||
1459 | self._log( |
||
1460 | "Failed to match a remote library named '{}'. Remote installation will use the default library".format( |
||
1461 | library_name), level='warning') |
||
1462 | # Just log the warning for this. If no matching library name is found it will remain set as the default library |
||
1463 | break |
||
1464 | if result.get('success'): |
||
1465 | break |
||
1466 | |||
1467 | # Start the remote task |
||
1468 | while not self.redundant_flag.is_set(): |
||
1469 | result = self.links.start_the_remote_task_by_id(self.installation_info, remote_task_id) |
||
1470 | if not result: |
||
1471 | # Unable to reach remote installation |
||
1472 | self.event.wait(2) |
||
1473 | continue |
||
1474 | if not result.get('success'): |
||
1475 | self._log("Failed to set initial remote pending task to status '{}'".format(original_abspath), level='error') |
||
1476 | # Send request to terminate the remote worker then return |
||
1477 | self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id) |
||
1478 | self.__write_failure_to_worker_log() |
||
1479 | return False |
||
1480 | if result.get('success'): |
||
1481 | break |
||
1482 | |||
1483 | # Loop while redundant_flag not set (while true because of below) |
||
1484 | worker_id = None |
||
1485 | task_status = '' |
||
1486 | last_status_fetch = 0 |
||
1487 | polling_delay = 5 |
||
1488 | while task_status != 'complete': |
||
1489 | self.event.wait(1) |
||
1490 | if self.redundant_flag.is_set(): |
||
1491 | # Send request to terminate the remote worker then exit |
||
1492 | if worker_id: |
||
1493 | self.links.terminate_remote_worker(self.installation_info, worker_id) |
||
1494 | break |
||
1495 | |||
1496 | # Only fetch the status every 5 seconds |
||
1497 | time_now = time.time() |
||
1498 | if last_status_fetch > (time_now - polling_delay): |
||
1499 | continue |
||
1500 | |||
1501 | # Fetch task status |
||
1502 | all_task_states = self.links.get_remote_pending_task_state(self.installation_info, remote_task_id) |
||
1503 | task_status = '' |
||
1504 | polling_delay = 5 |
||
1505 | if all_task_states: |
||
1506 | for ts in all_task_states.get('results', []): |
||
1507 | if str(ts.get('id')) == str(remote_task_id): |
||
1508 | # Task is complete. Exit loop but do not set redundant flag on link manager |
||
1509 | task_status = ts.get('status') |
||
1510 | break |
||
1511 | if not all_task_states.get('results', []): |
||
1512 | # Remote task list is empty |
||
1513 | task_status = 'removed' |
||
1514 | elif all_task_states.get('results') and task_status == '': |
||
1515 | # Remote task list did not contain this task |
||
1516 | task_status = 'removed' |
||
1517 | |||
1518 | # If the task status is 'complete', break the loop here and move onto the result retrieval |
||
1519 | # If all_task_states returned no results (we are unable to connect to the remote installation) |
||
1520 | # If all_task_states did return results but our task_status was found, the remote installation has removed our task |
||
1521 | # If the task status is not 'in_progress', loop here and wait for task to be picked up by a worker |
||
1522 | if task_status == 'complete': |
||
1523 | break |
||
1524 | elif not all_task_states: |
||
1525 | polling_delay = 10 |
||
1526 | last_status_fetch = time_now |
||
1527 | continue |
||
1528 | elif task_status == 'removed': |
||
1529 | self._log("Task has been removed by remote installation '{}'".format(original_abspath), level='error') |
||
1530 | self.__write_failure_to_worker_log() |
||
1531 | return False |
||
1532 | elif task_status != 'in_progress': |
||
1533 | # Mark this as the last time run |
||
1534 | last_status_fetch = time_now |
||
1535 | polling_delay = 10 |
||
1536 | continue |
||
1537 | |||
1538 | # Check if we know the task's worker ID already |
||
1539 | if not worker_id: |
||
1540 | # The task has been picked up by a worker, find out which one... |
||
1541 | workers_status = self.links.get_all_worker_status(self.installation_info) |
||
1542 | if not workers_status: |
||
1543 | # The request failed for some reason... Perhaps we lost contact with the remote installation |
||
1544 | # Mark this as the last time run |
||
1545 | last_status_fetch = time_now |
||
1546 | continue |
||
1547 | for worker in workers_status: |
||
1548 | if str(worker.get('current_task')) == str(remote_task_id): |
||
1549 | worker_id = worker.get('id') |
||
1550 | |||
1551 | # Fetch worker progress |
||
1552 | worker_status = self.links.get_single_worker_status(self.installation_info, worker_id) |
||
1553 | if not worker_status: |
||
1554 | # Mark this as the last time run |
||
1555 | last_status_fetch = time_now |
||
1556 | continue |
||
1557 | |||
1558 | # Update status |
||
1559 | self.paused = worker_status.get('paused') |
||
1560 | self.worker_log = worker_status.get('worker_log_tail') |
||
1561 | self.worker_runners_info = worker_status.get('runners_info') |
||
1562 | self.worker_subprocess_percent = worker_status.get('subprocess', {}).get('percent') |
||
1563 | self.worker_subprocess_elapsed = worker_status.get('subprocess', {}).get('elapsed') |
||
1564 | |||
1565 | # Mark this as the last time run |
||
1566 | last_status_fetch = time_now |
||
1567 | |||
1568 | # If the previous loop was broken because this tread needs to terminate, return False here (did not complete) |
||
1569 | if self.redundant_flag.is_set(): |
||
1570 | self.worker_log += ["\n\nREMOTE LINK MANAGER TERMINATED!"] |
||
1571 | self.current_task.save_command_log(self.worker_log) |
||
1572 | return False |
||
1573 | |||
1574 | self._log("Remote task completed '{}'".format(original_abspath), level='info') |
||
1575 | |||
1576 | # Create local cache path to download results |
||
1577 | task_cache_path = self.current_task.get_cache_path() |
||
1578 | # Ensure the cache directory exists |
||
1579 | cache_directory = os.path.dirname(os.path.abspath(task_cache_path)) |
||
1580 | if not os.path.exists(cache_directory): |
||
1581 | os.makedirs(cache_directory) |
||
1582 | |||
1583 | # Fetch remote task result data |
||
1584 | data = self.links.fetch_remote_task_data(self.installation_info, remote_task_id, |
||
1585 | os.path.join(cache_directory, 'remote_data.json')) |
||
1586 | |||
1587 | if not data: |
||
1588 | self._log( |
||
1589 | "Failed to retrieve remote task data for '{}'. NOTE: The cached files have not been removed from the remote host.".format( |
||
1590 | original_abspath), level='error') |
||
1591 | self.__write_failure_to_worker_log() |
||
1592 | return False |
||
1593 | self.worker_log = [data.get('log')] |
||
1594 | |||
1595 | # Save the completed command log |
||
1596 | self.current_task.save_command_log(self.worker_log) |
||
1597 | |||
1598 | # Fetch remote task file |
||
1599 | if data.get('task_success'): |
||
1600 | task_label = data.get('task_label') |
||
1601 | self._log( |
||
1602 | "Remote task #{} was successful, proceeding to download the completed file '{}'".format(remote_task_id, |
||
1603 | task_label), |
||
1604 | level='debug') |
||
1605 | # Set the new file out as the extension may have changed |
||
1606 | split_file_name = os.path.splitext(data.get('abspath')) |
||
1607 | file_extension = split_file_name[1].lstrip('.') |
||
1608 | self.current_task.set_cache_path(cache_directory, file_extension) |
||
1609 | # Read the updated cache path |
||
1610 | task_cache_path = self.current_task.get_cache_path() |
||
1611 | |||
1612 | # Loop until we are able to upload the file to the remote installation |
||
1613 | while not self.redundant_flag.is_set(): |
||
1614 | # Check for network transfer lock |
||
1615 | lock_key = self.links.acquire_network_transfer_lock(address, transfer_limit=2, lock_type='receive') |
||
1616 | if not lock_key: |
||
1617 | self.event.wait(1) |
||
1618 | continue |
||
1619 | # Download the file |
||
1620 | self._log("Downloading file from remote installation '{}'".format(task_label), level='debug') |
||
1621 | success = self.links.fetch_remote_task_completed_file(self.installation_info, remote_task_id, task_cache_path) |
||
1622 | self.links.release_network_transfer_lock(lock_key) |
||
1623 | if not success: |
||
1624 | self._log("Failed to download file '{}'".format(os.path.basename(data.get('abspath'))), level='error') |
||
1625 | # Send request to terminate the remote worker then return |
||
1626 | self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id) |
||
1627 | self.__write_failure_to_worker_log() |
||
1628 | return False |
||
1629 | break |
||
1630 | |||
1631 | # If the previous loop was broken because this tread needs to terminate, return False here (did not complete) |
||
1632 | if self.redundant_flag.is_set(): |
||
1633 | self.worker_log += ["\n\nREMOTE LINK MANAGER TERMINATED!"] |
||
1634 | self.current_task.save_command_log(self.worker_log) |
||
1635 | return False |
||
1636 | |||
1637 | # Match checksum from task result data with downloaded file |
||
1638 | if self.installation_info.get('enable_checksum_validation', False): |
||
1639 | downloaded_checksum = common.get_file_checksum(task_cache_path) |
||
1640 | if downloaded_checksum != data.get('checksum'): |
||
1641 | self._log("The downloaded file did not produce a correct checksum '{}'".format(task_cache_path), |
||
1642 | level='error') |
||
1643 | # Send request to terminate the remote worker then return |
||
1644 | self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id) |
||
1645 | self.__write_failure_to_worker_log() |
||
1646 | return False |
||
1647 | |||
1648 | # Send request to terminate the remote worker then return |
||
1649 | self.links.remove_task_from_remote_installation(self.installation_info, remote_task_id) |
||
1650 | |||
1651 | return True |
||
1652 | |||
1653 | self.__write_failure_to_worker_log() |
||
1654 | return False |