kapsikkum-unmanic – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env python3 |
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | unmanic.taskqueue.py |
||
6 | |||
7 | Written by: Josh.5 <jsunnex@gmail.com> |
||
8 | Date: 23 Apr 2019, (19:17 PM) |
||
9 | |||
10 | Copyright: |
||
11 | Copyright (C) Josh Sunnex - All Rights Reserved |
||
12 | |||
13 | Permission is hereby granted, free of charge, to any person obtaining a copy |
||
14 | of this software and associated documentation files (the "Software"), to deal |
||
15 | in the Software without restriction, including without limitation the rights |
||
16 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||
17 | copies of the Software, and to permit persons to whom the Software is |
||
18 | furnished to do so, subject to the following conditions: |
||
19 | |||
20 | The above copyright notice and this permission notice shall be included in all |
||
21 | copies or substantial portions of the Software. |
||
22 | |||
23 | THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, |
||
24 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
||
25 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
||
26 | IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||
27 | DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||
28 | OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||
29 | OR OTHER DEALINGS IN THE SOFTWARE. |
||
30 | |||
31 | """ |
||
32 | |||
33 | from unmanic.libs import task |
||
34 | from unmanic.libs import common |
||
35 | from unmanic.libs.unmodels import Libraries, LibraryTags, Tags |
||
36 | from unmanic.libs.unmodels.tasks import Tasks |
||
37 | |||
38 | """ |
||
39 | |||
40 | An object to contain all details of the job queue in such a way that it is presented in a synchronous list |
||
41 | while being able to be accessed by a number of threads simultaneously |
||
42 | |||
43 | """ |
||
44 | |||
45 | |||
46 | def build_tasks_count_query(status): |
||
47 | """ |
||
48 | Return a 0 if no tasks exist for the given status. |
||
49 | Return a count >= 1 if any tasks exist for the given status. |
||
50 | |||
51 | # TODO: look into peewee dynamic query building (surly this exists) |
||
52 | |||
53 | :param status: |
||
54 | :return: |
||
55 | """ |
||
56 | # Fetch only on result in order to know that there are any at all |
||
57 | # Filter by status |
||
58 | query = Tasks.select().where((Tasks.status == status)).limit(1) |
||
59 | return query.count() |
||
60 | |||
61 | |||
62 | def build_tasks_query(status, sort_by='id', sort_order='asc', local_only=False, library_names=None, library_tags=None): |
||
63 | """ |
||
64 | Return the first task item in the task list filtered by status |
||
65 | and sorted by the self.sort_by and self.sort_order variables. |
||
66 | |||
67 | :param status: |
||
68 | :param sort_order: |
||
69 | :param sort_by: |
||
70 | :param local_only: |
||
71 | :param library_names: |
||
72 | :param library_tags: |
||
73 | :return: |
||
74 | """ |
||
75 | # pick query based on sort params |
||
76 | query = Tasks.select().where((Tasks.status == status)) |
||
77 | |||
78 | # Limit to one result |
||
79 | if local_only: |
||
80 | query = query.where((Tasks.type == 'local')) |
||
81 | |||
82 | query = query.join(Libraries, on=(Libraries.id == Tasks.library_id)) |
||
83 | if library_names is not None: |
||
84 | query = query.where(Libraries.name.in_(library_names)) |
||
85 | if library_tags is not None: |
||
86 | query = query.join(LibraryTags, join_type='LEFT OUTER JOIN') |
||
87 | query = query.join(Tags, join_type='LEFT OUTER JOIN') |
||
88 | if library_tags: |
||
89 | query = query.where(Tags.name.in_(library_tags)) |
||
90 | else: |
||
91 | # Handle a query where the list is empty. In this case we want to match for only libraries that have no tags |
||
92 | query = query.where(Tags.name.is_null()) |
||
93 | |||
94 | # Limit to one result |
||
95 | query = query.limit(1) |
||
96 | if sort_order == 'asc': |
||
97 | query = query.order_by(sort_by.asc()) |
||
98 | else: |
||
99 | query = query.order_by(sort_by.desc()) |
||
100 | return query.first() |
||
101 | |||
102 | |||
103 | def build_tasks_query_full_task_list(status, sort_by='id', sort_order='asc', limit=None): |
||
104 | """ |
||
105 | Return all task items in the task list filtered by status. |
||
106 | The query is sorted by the self.sort_by and self.sort_order variables |
||
107 | and may be limited by the limit variable. |
||
108 | |||
109 | :param sort_order: |
||
110 | :param sort_by: |
||
111 | :param status: |
||
112 | :param limit: |
||
113 | :return: |
||
114 | """ |
||
115 | query = Tasks.select(Tasks).where((Tasks.status == status)) |
||
116 | |||
117 | # Set the sort order |
||
118 | if sort_order == 'asc': |
||
119 | query = query.order_by(sort_by.asc()) |
||
120 | else: |
||
121 | query = query.order_by(sort_by.desc()) |
||
122 | |||
123 | # Set query limit if one was given |
||
124 | if limit: |
||
125 | query = query.limit(limit) |
||
126 | |||
127 | # Return results as dictionary |
||
128 | return query.dicts() |
||
129 | |||
130 | |||
131 | def fetch_next_task_filtered(status, sort_by='id', sort_order='asc', local_only=False, library_names=None, library_tags=None): |
||
132 | """ |
||
133 | Returns the next task in the task list for a given status |
||
134 | |||
135 | :param status: |
||
136 | :param sort_order: |
||
137 | :param sort_by: |
||
138 | :param local_only: |
||
139 | :param library_names: |
||
140 | :param library_tags: |
||
141 | :return: |
||
142 | """ |
||
143 | # Fetch the task item first (to ensure it exists) |
||
144 | task_item = build_tasks_query(status, sort_by=sort_by, sort_order=sort_order, local_only=local_only, |
||
145 | library_names=library_names, library_tags=library_tags) |
||
146 | if not task_item: |
||
147 | return False |
||
148 | # Set the task object by the abspath and return it |
||
149 | next_task = task.Task() |
||
150 | next_task.read_and_set_task_by_absolute_path(task_item.abspath) |
||
151 | return next_task |
||
152 | |||
153 | |||
154 | class TaskQueue(object): |
||
155 | """ |
||
156 | TaskQueue |
||
157 | |||
158 | Creates an job item per file. |
||
159 | This job item is passed through stages by the Foreman and PostProcessor |
||
160 | |||
161 | Attributes: |
||
162 | data_queues (list): A list of Queue objects. Contains the logger |
||
163 | |||
164 | """ |
||
165 | |||
166 | def __init__(self, data_queues): |
||
167 | self.name = 'TaskQueue' |
||
168 | self.data_queues = data_queues |
||
169 | self.logger = data_queues["logging"].get_logger(self.name) |
||
170 | |||
171 | # Sort fields |
||
172 | self.sort_by = Tasks.priority |
||
173 | self.sort_order = 'desc' |
||
174 | |||
175 | def _log(self, message, message2='', level="info"): |
||
176 | message = common.format_message(message, message2) |
||
177 | getattr(self.logger, level)(message) |
||
178 | |||
179 | """ |
||
180 | Last task based on status pending, in_progress or processed |
||
181 | """ |
||
182 | |||
183 | def list_pending_tasks(self, limit=None): |
||
184 | """ |
||
185 | Returns a list of 'pending' tasks |
||
186 | Can limit to <limit> results |
||
187 | |||
188 | :param limit: |
||
189 | :return: |
||
190 | """ |
||
191 | results = build_tasks_query_full_task_list('pending', self.sort_by, self.sort_order, limit) |
||
192 | if results: |
||
193 | return list(results) |
||
194 | return [] |
||
195 | |||
196 | def list_in_progress_tasks(self, limit=None): |
||
197 | """ |
||
198 | Returns a list of 'in_progress' tasks |
||
199 | Can limit to <limit> results |
||
200 | |||
201 | :param limit: |
||
202 | :return: |
||
203 | """ |
||
204 | results = build_tasks_query_full_task_list('in_progress', self.sort_by, self.sort_order, limit) |
||
205 | if results: |
||
206 | return list(results) |
||
207 | return [] |
||
208 | |||
209 | def list_processed_tasks(self, limit=None): |
||
210 | """ |
||
211 | Returns a list of 'processed' tasks |
||
212 | Can limit to <limit> results |
||
213 | |||
214 | :param limit: |
||
215 | :return: |
||
216 | """ |
||
217 | results = build_tasks_query_full_task_list('processed', self.sort_by, self.sort_order, limit) |
||
218 | if results: |
||
219 | return list(results) |
||
220 | return [] |
||
221 | |||
222 | """ |
||
223 | Get first task in task list based on status pending, in_progress or processed |
||
224 | """ |
||
225 | |||
226 | def get_next_pending_tasks(self, local_only=False, library_names=None, library_tags=None): |
||
227 | """ |
||
228 | Fetch the next pending task. |
||
229 | Set that task status as 'in_progress' and then return it. |
||
230 | |||
231 | :param local_only: |
||
232 | :param library_names: |
||
233 | :param library_tags: |
||
234 | :return: |
||
235 | """ |
||
236 | # Fetch Task item matching the filters specified |
||
237 | task_item = fetch_next_task_filtered('pending', sort_by=self.sort_by, sort_order=self.sort_order, |
||
238 | local_only=local_only, library_names=library_names, library_tags=library_tags) |
||
239 | return task_item |
||
240 | |||
241 | def get_next_processed_tasks(self): |
||
242 | # Fetch Task item matching the filters specified |
||
243 | task_item = fetch_next_task_filtered('processed', sort_by=self.sort_by, sort_order=self.sort_order) |
||
244 | return task_item |
||
245 | |||
246 | def requeue_tasks_at_bottom(self, task_id): |
||
247 | task_handler = task.Task() |
||
248 | return task_handler.reorder_tasks([task_id], 'bottom') |
||
249 | |||
250 | """ |
||
251 | Check if a particular task list is empty |
||
252 | """ |
||
253 | |||
254 | @staticmethod |
||
255 | def task_list_pending_is_empty(): |
||
256 | # Fetch only on result in order to know that there are any at all |
||
257 | pending_query_count = build_tasks_count_query('pending') |
||
258 | if pending_query_count > 0: |
||
259 | return False |
||
260 | return True |
||
261 | |||
262 | @staticmethod |
||
263 | def task_list_in_progress_is_empty(): |
||
264 | # Fetch only on result in order to know that there are any at all |
||
265 | pending_query_count = build_tasks_count_query('in_progress') |
||
266 | if pending_query_count > 0: |
||
267 | return False |
||
268 | return True |
||
269 | |||
270 | @staticmethod |
||
271 | def task_list_processed_is_empty(): |
||
272 | # Fetch only on result in order to know that there are any at all |
||
273 | pending_query_count = build_tasks_count_query('processed') |
||
274 | if pending_query_count > 0: |
||
275 | return False |
||
276 | return True |
||
277 | |||
278 | """ |
||
279 | Set the status of a task item |
||
280 | """ |
||
281 | |||
282 | @staticmethod |
||
283 | def mark_item_in_progress(task_item): |
||
284 | """ |
||
285 | Set the given task status as 'in_progress' and then return it. |
||
286 | |||
287 | :param task_item: |
||
288 | :return: |
||
289 | """ |
||
290 | # Set item as status = 'in_progress' |
||
291 | task_item.set_status('in_progress') |
||
292 | return task_item |
||
293 | |||
294 | @staticmethod |
||
295 | def mark_item_as_processed(task_item): |
||
296 | """ |
||
297 | Set the given task status as 'processed' and then return it. |
||
298 | |||
299 | :param task_item: |
||
300 | :return: |
||
301 | """ |
||
302 | # Set item as status = 'processed' |
||
303 | task_item.set_status('processed') |
||
304 | return task_item |