kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.taskqueue.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 23 Apr 2019, (19:17 PM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32  
33 from unmanic.libs import task
34 from unmanic.libs import common
35 from unmanic.libs.unmodels import Libraries, LibraryTags, Tags
36 from unmanic.libs.unmodels.tasks import Tasks
37  
38 """
39  
40 An object to contain all details of the job queue in such a way that it is presented in a synchronous list
41 while being able to be accessed by a number of threads simultaneously
42  
43 """
44  
45  
46 def build_tasks_count_query(status):
47 """
48 Return a 0 if no tasks exist for the given status.
49 Return a count >= 1 if any tasks exist for the given status.
50  
51 # TODO: look into peewee dynamic query building (surly this exists)
52  
53 :param status:
54 :return:
55 """
56 # Fetch only on result in order to know that there are any at all
57 # Filter by status
58 query = Tasks.select().where((Tasks.status == status)).limit(1)
59 return query.count()
60  
61  
62 def build_tasks_query(status, sort_by='id', sort_order='asc', local_only=False, library_names=None, library_tags=None):
63 """
64 Return the first task item in the task list filtered by status
65 and sorted by the self.sort_by and self.sort_order variables.
66  
67 :param status:
68 :param sort_order:
69 :param sort_by:
70 :param local_only:
71 :param library_names:
72 :param library_tags:
73 :return:
74 """
75 # pick query based on sort params
76 query = Tasks.select().where((Tasks.status == status))
77  
78 # Limit to one result
79 if local_only:
80 query = query.where((Tasks.type == 'local'))
81  
82 query = query.join(Libraries, on=(Libraries.id == Tasks.library_id))
83 if library_names is not None:
84 query = query.where(Libraries.name.in_(library_names))
85 if library_tags is not None:
86 query = query.join(LibraryTags, join_type='LEFT OUTER JOIN')
87 query = query.join(Tags, join_type='LEFT OUTER JOIN')
88 if library_tags:
89 query = query.where(Tags.name.in_(library_tags))
90 else:
91 # Handle a query where the list is empty. In this case we want to match for only libraries that have no tags
92 query = query.where(Tags.name.is_null())
93  
94 # Limit to one result
95 query = query.limit(1)
96 if sort_order == 'asc':
97 query = query.order_by(sort_by.asc())
98 else:
99 query = query.order_by(sort_by.desc())
100 return query.first()
101  
102  
103 def build_tasks_query_full_task_list(status, sort_by='id', sort_order='asc', limit=None):
104 """
105 Return all task items in the task list filtered by status.
106 The query is sorted by the self.sort_by and self.sort_order variables
107 and may be limited by the limit variable.
108  
109 :param sort_order:
110 :param sort_by:
111 :param status:
112 :param limit:
113 :return:
114 """
115 query = Tasks.select(Tasks).where((Tasks.status == status))
116  
117 # Set the sort order
118 if sort_order == 'asc':
119 query = query.order_by(sort_by.asc())
120 else:
121 query = query.order_by(sort_by.desc())
122  
123 # Set query limit if one was given
124 if limit:
125 query = query.limit(limit)
126  
127 # Return results as dictionary
128 return query.dicts()
129  
130  
131 def fetch_next_task_filtered(status, sort_by='id', sort_order='asc', local_only=False, library_names=None, library_tags=None):
132 """
133 Returns the next task in the task list for a given status
134  
135 :param status:
136 :param sort_order:
137 :param sort_by:
138 :param local_only:
139 :param library_names:
140 :param library_tags:
141 :return:
142 """
143 # Fetch the task item first (to ensure it exists)
144 task_item = build_tasks_query(status, sort_by=sort_by, sort_order=sort_order, local_only=local_only,
145 library_names=library_names, library_tags=library_tags)
146 if not task_item:
147 return False
148 # Set the task object by the abspath and return it
149 next_task = task.Task()
150 next_task.read_and_set_task_by_absolute_path(task_item.abspath)
151 return next_task
152  
153  
154 class TaskQueue(object):
155 """
156 TaskQueue
157  
158 Creates an job item per file.
159 This job item is passed through stages by the Foreman and PostProcessor
160  
161 Attributes:
162 data_queues (list): A list of Queue objects. Contains the logger
163  
164 """
165  
166 def __init__(self, data_queues):
167 self.name = 'TaskQueue'
168 self.data_queues = data_queues
169 self.logger = data_queues["logging"].get_logger(self.name)
170  
171 # Sort fields
172 self.sort_by = Tasks.priority
173 self.sort_order = 'desc'
174  
175 def _log(self, message, message2='', level="info"):
176 message = common.format_message(message, message2)
177 getattr(self.logger, level)(message)
178  
179 """
180 Last task based on status pending, in_progress or processed
181 """
182  
183 def list_pending_tasks(self, limit=None):
184 """
185 Returns a list of 'pending' tasks
186 Can limit to <limit> results
187  
188 :param limit:
189 :return:
190 """
191 results = build_tasks_query_full_task_list('pending', self.sort_by, self.sort_order, limit)
192 if results:
193 return list(results)
194 return []
195  
196 def list_in_progress_tasks(self, limit=None):
197 """
198 Returns a list of 'in_progress' tasks
199 Can limit to <limit> results
200  
201 :param limit:
202 :return:
203 """
204 results = build_tasks_query_full_task_list('in_progress', self.sort_by, self.sort_order, limit)
205 if results:
206 return list(results)
207 return []
208  
209 def list_processed_tasks(self, limit=None):
210 """
211 Returns a list of 'processed' tasks
212 Can limit to <limit> results
213  
214 :param limit:
215 :return:
216 """
217 results = build_tasks_query_full_task_list('processed', self.sort_by, self.sort_order, limit)
218 if results:
219 return list(results)
220 return []
221  
222 """
223 Get first task in task list based on status pending, in_progress or processed
224 """
225  
226 def get_next_pending_tasks(self, local_only=False, library_names=None, library_tags=None):
227 """
228 Fetch the next pending task.
229 Set that task status as 'in_progress' and then return it.
230  
231 :param local_only:
232 :param library_names:
233 :param library_tags:
234 :return:
235 """
236 # Fetch Task item matching the filters specified
237 task_item = fetch_next_task_filtered('pending', sort_by=self.sort_by, sort_order=self.sort_order,
238 local_only=local_only, library_names=library_names, library_tags=library_tags)
239 return task_item
240  
241 def get_next_processed_tasks(self):
242 # Fetch Task item matching the filters specified
243 task_item = fetch_next_task_filtered('processed', sort_by=self.sort_by, sort_order=self.sort_order)
244 return task_item
245  
246 def requeue_tasks_at_bottom(self, task_id):
247 task_handler = task.Task()
248 return task_handler.reorder_tasks([task_id], 'bottom')
249  
250 """
251 Check if a particular task list is empty
252 """
253  
254 @staticmethod
255 def task_list_pending_is_empty():
256 # Fetch only on result in order to know that there are any at all
257 pending_query_count = build_tasks_count_query('pending')
258 if pending_query_count > 0:
259 return False
260 return True
261  
262 @staticmethod
263 def task_list_in_progress_is_empty():
264 # Fetch only on result in order to know that there are any at all
265 pending_query_count = build_tasks_count_query('in_progress')
266 if pending_query_count > 0:
267 return False
268 return True
269  
270 @staticmethod
271 def task_list_processed_is_empty():
272 # Fetch only on result in order to know that there are any at all
273 pending_query_count = build_tasks_count_query('processed')
274 if pending_query_count > 0:
275 return False
276 return True
277  
278 """
279 Set the status of a task item
280 """
281  
282 @staticmethod
283 def mark_item_in_progress(task_item):
284 """
285 Set the given task status as 'in_progress' and then return it.
286  
287 :param task_item:
288 :return:
289 """
290 # Set item as status = 'in_progress'
291 task_item.set_status('in_progress')
292 return task_item
293  
294 @staticmethod
295 def mark_item_as_processed(task_item):
296 """
297 Set the given task status as 'processed' and then return it.
298  
299 :param task_item:
300 :return:
301 """
302 # Set item as status = 'processed'
303 task_item.set_status('processed')
304 return task_item