kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.libraryscanner.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 20 Aug 2021, (5:37 PM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import gc
33 import json
34 import os
35 import queue
36 import threading
37 import time
38  
39 import psutil
40 import schedule
41  
42 from unmanic import config
43 from unmanic.libs import common, unlogger
44 from unmanic.libs.filetest import FileTesterThread
45 from unmanic.libs.library import Library
46 from unmanic.libs.plugins import PluginsHandler
47  
48  
49 class LibraryScannerManager(threading.Thread):
50 def __init__(self, data_queues, event):
51 super(LibraryScannerManager, self).__init__(name='LibraryScannerManager')
52 self.interval = 0
53 self.firstrun = True
54 self.data_queues = data_queues
55 self.settings = config.Config()
56 self.logger = None
57 self.event = event
58 self.scheduledtasks = data_queues["scheduledtasks"]
59 self.library_scanner_triggers = data_queues["library_scanner_triggers"]
60 self.abort_flag = threading.Event()
61 self.abort_flag.clear()
62 self.scheduler = schedule.Scheduler()
63  
64 self.file_test_managers = {}
65 self.files_to_test = queue.Queue()
66 self.files_to_process = queue.Queue()
67  
68 def _log(self, message, message2='', level="info"):
69 if not self.logger:
70 unmanic_logging = unlogger.UnmanicLogger.__call__()
71 self.logger = unmanic_logging.get_logger(self.name)
72 message = common.format_message(message, message2)
73 getattr(self.logger, level)(message)
74  
75 def stop(self):
76 self.abort_flag.set()
77 # Stop all child threads
78 self.stop_all_file_test_managers()
79  
80 def abort_is_set(self):
81 # Check if the abort flag is set
82 if self.abort_flag.is_set():
83 # Return True straight away if it is
84 return True
85 # Sleep for a fraction of a second to prevent CPU pinning
86 self.event.wait(.1)
87 # Return False
88 return False
89  
90 def run(self):
91 self._log("Starting LibraryScanner Monitor loop")
92 while not self.abort_is_set():
93 self.event.wait(1)
94  
95 # Main loop to configure the scheduler
96 if int(self.settings.get_schedule_full_scan_minutes()) != self.interval:
97 self.interval = int(self.settings.get_schedule_full_scan_minutes())
98 if self.interval and self.interval != 0:
99 self._log("Setting LibraryScanner schedule to scan every {} mins...".format(self.interval))
100 # Configure schedule
101 self.scheduler.every(self.interval).minutes.do(self.scheduled_job)
102 # Register application
103 self.register_unmanic()
104  
105 # First run the task
106 if self.settings.get_run_full_scan_on_start() and self.firstrun:
107 self._log("Running LibraryScanner on start")
108 self.scheduled_job()
109 self.firstrun = False
110  
111 # Then loop and wait for the schedule
112 while not self.abort_is_set():
113 # Delay for 1 second before checking again.
114 self.event.wait(1)
115  
116 # Check if a manual library scan was triggered
117 try:
118 if not self.library_scanner_triggers.empty():
119 trigger = self.library_scanner_triggers.get_nowait()
120 if trigger == "library_scan":
121 self.scheduled_job()
122 break
123 except queue.Empty:
124 continue
125 except Exception as e:
126 self._log("Exception in retrieving library scanner trigger {}:".format(self.name), message2=str(e),
127 level="exception")
128  
129 # Check if library scanner is enabled
130 if not self.settings.get_enable_library_scanner():
131 # The library scanner is not enabled. Dont run anything
132 continue
133  
134 # Check if scheduled task is due
135 self.scheduler.run_pending()
136  
137 # If the settings have changed, then break this loop and clear
138 # the scheduled job resetting to the new interval
139 if int(self.settings.get_schedule_full_scan_minutes()) != self.interval:
140 self._log("Resetting LibraryScanner schedule")
141 break
142 self.scheduler.clear()
143  
144 self._log("Leaving LibraryScanner Monitor loop...")
145  
146 def scheduled_job(self):
147 """
148 Function called by the scheduled task
149  
150 :return:
151 """
152 if not self.system_configuration_is_valid():
153 self._log("Skipping library scanner due invalid system configuration.", level='warning')
154 return
155  
156 # For each configured library, check if a library scan is required
157 no_libraries_configured = True
158 for lib_info in Library.get_all_libraries():
159 no_libraries_configured = False
160 try:
161 library = Library(lib_info['id'])
162 except Exception as e:
163 self._log("Unable to fetch library config for ID {}".format(lib_info['id']), level='exception')
164 continue
165 # Check if the library is configured for remote files only
166 if library.get_enable_remote_only():
167 # This library is configured to receive remote files only... Never run a library scan on it
168 continue
169 # Check if library scanner is enabled on any library
170 if library.get_enable_scanner():
171 # Run library scan
172 self._log("Running full library scan on library '{}'".format(library.get_name()))
173 self.scan_library_path(library.get_path(), library.get_id())
174 if no_libraries_configured:
175 self._log("No libraries are configured to run a library scan")
176  
177 def system_configuration_is_valid(self):
178 """
179 Check and ensure the system configuration is correct for running
180  
181 :return:
182 """
183 valid = True
184 plugin_handler = PluginsHandler()
185 if plugin_handler.get_incompatible_enabled_plugins(self.data_queues.get('frontend_messages')):
186 valid = False
187 if not Library.within_library_count_limits(self.data_queues.get('frontend_messages')):
188 valid = False
189 return valid
190  
191 def add_path_to_queue(self, pathname, library_id, priority_score):
192 self.scheduledtasks.put({
193 'pathname': pathname,
194 'library_id': library_id,
195 'priority_score': priority_score,
196 })
197  
198 def start_results_manager_thread(self, manager_id, status_updates, library_id):
199 manager = FileTesterThread("FileTesterThread-{}".format(manager_id), self.files_to_test,
200 self.files_to_process, status_updates, library_id, self.event)
201 manager.daemon = True
202 manager.start()
203 self.file_test_managers[manager_id] = manager
204  
205 def stop_all_file_test_managers(self):
206 for manager_id in self.file_test_managers:
207 self.file_test_managers[manager_id].abort_flag.set()
208  
209 def scan_library_path(self, library_path, library_id):
210 """
211 Run a scan of the given library path
212  
213 :param library_path:
214 :param library_id:
215 :return:
216 """
217 if not os.path.exists(library_path):
218 self._log("Path does not exist - '{}'".format(library_path), level="warning")
219 return
220 if self.settings.get_debugging():
221 self._log("Scanning directory - '{}'".format(library_path), level="debug")
222  
223 # Push status notification to frontend
224 frontend_messages = self.data_queues.get('frontend_messages')
225  
226 # Start X number of FileTesterThread threads
227 concurrent_file_testers = self.settings.get_concurrent_file_testers()
228 status_updates = queue.Queue()
229 self.file_test_managers = {}
230 for results_manager_id in range(int(concurrent_file_testers)):
231 self.start_results_manager_thread(results_manager_id, status_updates, library_id)
232  
233 start_time = time.time()
234  
235 frontend_messages.update(
236 {
237 'id': 'libraryScanProgress',
238 'type': 'status',
239 'code': 'libraryScanProgress',
240 'message': "Scanning directory - '{}'".format(library_path),
241 'timeout': 0
242 }
243 )
244  
245 follow_symlinks = self.settings.get_follow_symlinks()
246 total_file_count = 0
247 current_file = ''
248 percent_completed_string = ''
249 for root, subFolders, files in os.walk(library_path, followlinks=follow_symlinks):
250 if self.abort_flag.is_set():
251 break
252 if self.settings.get_debugging():
253 self._log(json.dumps(files, indent=2), level="debug")
254 # Add all files in this path that match our container filter
255 for file_path in files:
256 if self.abort_flag.is_set():
257 break
258  
259 # Place file's full path in queue to be tested
260 self.files_to_test.put(os.path.join(root, file_path))
261 total_file_count += 1
262  
263 # Update status messages while fetching file list
264 if not status_updates.empty():
265 current_file = status_updates.get()
266 percent_completed_string = 'Testing: {}'.format(current_file)
267 frontend_messages.update(
268 {
269 'id': 'libraryScanProgress',
270 'type': 'status',
271 'code': 'libraryScanProgress',
272 'message': percent_completed_string,
273 'timeout': 0
274 }
275 )
276  
277 # Loop while waiting for all threads to finish
278 double_check = 0
279 while not self.abort_flag.is_set():
280 frontend_messages.update(
281 {
282 'id': 'libraryScanProgress',
283 'type': 'status',
284 'code': 'libraryScanProgress',
285 'message': percent_completed_string,
286 'timeout': 0
287 }
288 )
289 # Check if all files have been tested
290 if self.files_to_test.empty() and self.files_to_process.empty() and status_updates.empty():
291 percent_completed_string = '100%'
292 # Add a "double check" section.
293 # This is used to ensure that the loop does not prematurely exit when the last file tests still
294 # progressing that have not yet made it to the "files_to_process" queue.
295 double_check += 1
296 if double_check > 5:
297 # There are not more files to test. Mark manager threads as completed
298 self.stop_all_file_test_managers()
299 break
300 self.event.wait(1)
301 continue
302  
303 # Calculate percent of files tested
304 if not self.files_to_test.empty():
305 current_queue_size = self.files_to_test.qsize()
306 if int(total_file_count) > 0 and int(current_queue_size) > 0:
307 percent_remaining = int((int(current_queue_size) / int(total_file_count)) * 100)
308 percent_completed = int(100 - percent_remaining)
309 percent_completed_string = '{}% - Testing: {}'.format(percent_completed, current_file)
310 elif current_file:
311 percent_completed_string = '{}% - Testing: {}'.format('???', current_file)
312  
313 # Fetch frontend messages from queue
314 if not status_updates.empty():
315 current_file = status_updates.get()
316 continue
317 elif not self.files_to_process.empty():
318 item = self.files_to_process.get()
319 self.add_path_to_queue(item.get('path'), library_id, item.get('priority_score'))
320 continue
321 else:
322 self.event.wait(.1)
323  
324 # Wait for threads to finish
325 for manager_id in self.file_test_managers:
326 self.file_test_managers[manager_id].abort_flag.set()
327 self.file_test_managers[manager_id].join(2)
328  
329 self._log("Library scan completed in {} seconds".format((time.time() - start_time)), level="warning")
330  
331 # Run a manual garbage collection
332 gc.collect()
333  
334 # Remove frontend status message
335 frontend_messages.remove_item('libraryScanProgress')
336  
337 def register_unmanic(self):
338 from unmanic.libs import session
339 s = session.Session()
340 s.register_unmanic()