kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.eventmonitor.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 25 Feb 2021, (10:06 AM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import os
33 import queue
34 import threading
35 import time
36  
37 from unmanic import config
38 from unmanic.libs.library import Library
39 from unmanic.libs.plugins import PluginsHandler
40  
41 try:
42 from watchdog.observers import Observer
43 from watchdog.events import FileSystemEventHandler
44  
45 event_monitor_module = 'watchdog'
46 except ImportError:
47 class Observer(object):
48 pass
49  
50  
51 class FileSystemEventHandler(object):
52 pass
53  
54  
55 event_monitor_module = None
56  
57 from unmanic.libs import common, unlogger
58 from unmanic.libs.filetest import FileTest
59  
60  
61 class EventHandler(FileSystemEventHandler):
62 """
63 Handle any library file modification events
64  
65 This watchdog library is not strictly inline with inotify...
66 It does not watch MOVED_TO, CREATED, DELETE, MODIFIED, CLOSE_WRITE like the previous library did.
67 Instead the outputs for a Unix FS are as follows:
68 - Read = [] : No events are triggered for reading a file
69 - Modify = ["modified", "closed"] :
70 - Move (atomic) = ["created", "modified"] :
71 - Move (non-atomic) = ["created", "modified", "closed"] : Lots of 'modified' events as the file was "copied"
72 - Copy = ["created", "modified", "closed"] : ^ ditto
73 - Create = ["created", "modified", "closed"] : ^ ditto
74 - Delete = ["deleted", "modified"] :
75 - Hardlink = ["created", "modified"] :
76  
77 From this, the only event we really need to monitor is the "created" and "closed" events.
78 """
79  
80 def __init__(self, files_to_test, library_id):
81 self.name = "EventProcessor"
82 self.files_to_test = files_to_test
83 self.library_id = library_id
84 self.logger = None
85 self.abort_flag = threading.Event()
86 self.abort_flag.clear()
87  
88 def _log(self, message, message2='', level="info"):
89 if not self.logger:
90 unmanic_logging = unlogger.UnmanicLogger.__call__()
91 self.logger = unmanic_logging.get_logger(self.name)
92 message = common.format_message(message, message2)
93 getattr(self.logger, level)(message)
94  
95 def on_any_event(self, event):
96 if event.event_type in ["created", "closed"]:
97 # Ensure event was not for a directory
98 if event.is_directory:
99 self._log("Detected event is for a directory. Ignoring...", level="debug")
100 else:
101 self._log("Detected '{}' event on file path '{}'".format(event.event_type, event.src_path))
102 self.files_to_test.put({
103 'src_path': event.src_path,
104 'library_id': self.library_id,
105 })
106  
107  
108 class EventMonitorManager(threading.Thread):
109 """
110 EventMonitorManager
111  
112 Manage the EventProcessor thread.
113 If the settings for enabling the EventProcessor changes, this manager
114 class will stop or start the EventProcessor thread accordingly.
115  
116 """
117  
118 def __init__(self, data_queues, event):
119 super(EventMonitorManager, self).__init__(name='EventMonitorManager')
120 self.name = "EventMonitorManager"
121 self.data_queues = data_queues
122 self.settings = config.Config()
123 self.logger = None
124 self.event = event
125  
126 # Create an event queue
127 self.files_to_test = queue.Queue()
128  
129 self.abort_flag = threading.Event()
130 self.abort_flag.clear()
131  
132 self.event_observer_thread = None
133 self.event_observer_threads = []
134  
135 def _log(self, message, message2='', level="info"):
136 if not self.logger:
137 unmanic_logging = unlogger.UnmanicLogger.__call__()
138 self.logger = unmanic_logging.get_logger(self.name)
139 message = common.format_message(message, message2)
140 getattr(self.logger, level)(message)
141  
142 def stop(self):
143 self.abort_flag.set()
144  
145 def run(self):
146 self._log("Starting EventMonitorManager loop")
147 while not self.abort_flag.is_set():
148 self.event.wait(.5)
149  
150 if not self.system_configuration_is_valid():
151 self.event.wait(2)
152 continue
153  
154 if not self.files_to_test.empty():
155 item = self.files_to_test.get()
156 pathname = item.get('src_path')
157 library_id = item.get('library_id')
158 self.manage_event_queue(pathname, library_id)
159 continue
160  
161 # Check if monitor is enabled for at least one library
162 enable_inotify = False
163 for lib_info in Library.get_all_libraries():
164 try:
165 library = Library(lib_info['id'])
166 except Exception as e:
167 self._log("Unable to fetch library config for ID {}".format(lib_info['id']), level='exception')
168 continue
169 # Check if the library is configured for remote files only
170 if library.get_enable_remote_only():
171 # This library is configured to receive remote files only... Never enable the file monitor
172 continue
173 # Check if file monitor is enabled on any library
174 if library.get_enable_inotify():
175 enable_inotify = True
176  
177 # If at least library has the monitor enabled, then start it. Otherwise stop the monitor process
178 if enable_inotify:
179 # If enabled, ensure it is running and start it if it is not
180 if not self.event_observer_thread:
181 self.start_event_processor()
182 else:
183 # If not enabled, ensure the EventProcessor is not running and stop it if it is
184 if self.event_observer_thread:
185 self.stop_event_processor()
186 # Add delay
187 self.event.wait(2)
188  
189 self.stop_event_processor()
190 self._log("Leaving EventMonitorManager loop...")
191  
192 def system_configuration_is_valid(self):
193 """
194 Check and ensure the system configuration is correct for running
195  
196 :return:
197 """
198 valid = True
199 plugin_handler = PluginsHandler()
200 if plugin_handler.get_incompatible_enabled_plugins(self.data_queues.get('frontend_messages')):
201 valid = False
202 if not Library.within_library_count_limits(self.data_queues.get('frontend_messages')):
203 valid = False
204 return valid
205  
206 def start_event_processor(self):
207 """
208 Start the EventProcessor thread if it is not already running.
209  
210 :return:
211 """
212 if not self.event_observer_thread:
213 monitoring_path = False
214 self.event_observer_thread = Observer()
215 for lib_info in Library.get_all_libraries():
216 self.event.wait(.2)
217 try:
218 library = Library(lib_info['id'])
219 except Exception as e:
220 self._log("Unable to fetch library config for ID {}".format(lib_info['id']), level='exception')
221 continue
222 # Check if the library is configured for remote files only
223 if library.get_enable_remote_only():
224 # This library is configured to receive remote files only... Never enable the file monitor
225 continue
226 # Check if library scanner is enabled on any library
227 if library.get_enable_inotify():
228 library_path = library.get_path()
229 if not os.path.exists(library_path):
230 continue
231 self._log("Adding library path to monitor '{}'".format(library_path))
232 event_handler = EventHandler(self.files_to_test, library.get_id())
233 self.event_observer_thread.schedule(event_handler, library_path, recursive=True)
234 monitoring_path = True
235 # Only start observer if a path was added to be monitored
236 if monitoring_path:
237 self._log("EventMonitorManager spawning EventProcessor thread...")
238 self.event_observer_thread.start()
239 else:
240 self._log("Given signal to start the EventProcessor thread, but it is already running....")
241  
242 def stop_event_processor(self):
243 """
244 Stop the EventProcessor thread if it is running.
245  
246 :return:
247 """
248 if self.event_observer_thread:
249 self._log("EventMonitorManager stopping EventProcessor thread...")
250  
251 self._log("Sending thread EventProcessor abort signal")
252 self.event_observer_thread.stop()
253  
254 self._log("Waiting for thread EventProcessor to stop")
255 self.event_observer_thread.join()
256 self._log("Thread EventProcessor has successfully stopped")
257 else:
258 self._log("Given signal to stop the EventProcessor thread, but it is not running...")
259  
260 self.event_observer_thread = None
261  
262 def manage_event_queue(self, pathname, library_id):
263 """
264 Manage all monitored events
265  
266 Unlike the library scanner, all events are processed sequentially one at a time.
267 This avoids a file being added twice on 2 events.
268  
269 :param pathname:
270 :param library_id:
271 :return:
272 """
273 # Test file to be added to task list. Add it if required
274 try:
275 file_test = FileTest(library_id)
276 result, issues, priority_score = file_test.should_file_be_added_to_task_list(pathname)
277 # Log any error messages
278 for issue in issues:
279 if type(issue) is dict:
280 self._log(issue.get('message'))
281 else:
282 self._log(issue)
283 # If file needs to be added, then add it
284 if result:
285 self.__add_path_to_queue(pathname, library_id, priority_score)
286 except UnicodeEncodeError:
287 self._log("File contains Unicode characters that cannot be processed. Ignoring.", level="warning")
288 except Exception as e:
289 self._log("Exception testing file path in {}. Ignoring.".format(self.name), message2=str(e), level="exception")
290  
291 def __add_path_to_queue(self, pathname, library_id, priority_score):
292 """
293 Add a given path to the pending task queue
294  
295 :param pathname:
296 :param library_id:
297 :param priority_score:
298 :return:
299 """
300 self.data_queues.get('inotifytasks').put({
301 'pathname': pathname,
302 'library_id': library_id,
303 'priority_score': priority_score,
304 })