kapsikkum-unmanic – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | #!/usr/bin/env python3 |
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | unmanic.common.py |
||
6 | |||
7 | Written by: Josh.5 <jsunnex@gmail.com> |
||
8 | Date: 06 Dec 2018, (7:21 AM) |
||
9 | |||
10 | Copyright: |
||
11 | Copyright (C) Josh Sunnex - All Rights Reserved |
||
12 | |||
13 | Permission is hereby granted, free of charge, to any person obtaining a copy |
||
14 | of this software and associated documentation files (the "Software"), to deal |
||
15 | in the Software without restriction, including without limitation the rights |
||
16 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||
17 | copies of the Software, and to permit persons to whom the Software is |
||
18 | furnished to do so, subject to the following conditions: |
||
19 | |||
20 | The above copyright notice and this permission notice shall be included in all |
||
21 | copies or substantial portions of the Software. |
||
22 | |||
23 | THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, |
||
24 | EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
||
25 | MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
||
26 | IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||
27 | DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||
28 | OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||
29 | OR OTHER DEALINGS IN THE SOFTWARE. |
||
30 | |||
31 | """ |
||
32 | import copy |
||
33 | import datetime |
||
34 | import hashlib |
||
35 | import os |
||
36 | import random |
||
37 | import string |
||
38 | import shutil |
||
39 | |||
40 | |||
41 | def get_home_dir(): |
||
42 | # Attempt to get the HOME_DIR environment variable |
||
43 | home_dir = os.environ.get('HOME_DIR') |
||
44 | # If HOME_DIR is unset, empty, or specifically set to use the home directory |
||
45 | if not home_dir: |
||
46 | # Expand the tilde to the user's home directory |
||
47 | home_dir = os.path.expanduser("~") |
||
48 | else: |
||
49 | # For any value of HOME_DIR, ensure tilde and relative paths are correctly handled |
||
50 | # os.path.expanduser will handle tilde but won't affect absolute paths |
||
51 | # os.path.abspath will convert a relative path to an absolute path |
||
52 | home_dir = os.path.abspath(os.path.expanduser(home_dir)) |
||
53 | return home_dir |
||
54 | |||
55 | |||
56 | def get_default_root_path(): |
||
57 | root = os.path.join(os.sep) |
||
58 | if os.name == "nt": |
||
59 | root = os.path.join('c:', os.sep) |
||
60 | return root |
||
61 | |||
62 | |||
63 | def get_default_library_path(): |
||
64 | library_path = os.path.join(get_default_root_path(), 'library') |
||
65 | # Windows set the default library directory into %USERPROFILE%\Documents |
||
66 | if os.name == "nt": |
||
67 | library_path = os.path.join(os.path.expandvars(r'%USERPROFILE%'), 'Documents') |
||
68 | return library_path |
||
69 | |||
70 | |||
71 | def get_default_cache_path(): |
||
72 | cache_path = os.path.join(get_default_root_path(), 'tmp', 'unmanic') |
||
73 | # Windows set the default temp directory into %LOCALAPPDATA%\Temp\Unmanic |
||
74 | if os.name == "nt": |
||
75 | cache_path = os.path.join(os.path.expandvars(r'%LOCALAPPDATA%\Temp'), 'Unmanic') |
||
76 | return cache_path |
||
77 | |||
78 | |||
79 | def format_message(message, message2=''): |
||
80 | message = str(message) |
||
81 | if message2: |
||
82 | # Message2 can support other objects: |
||
83 | if isinstance(message2, str): |
||
84 | message = "%s - %s" % (message, str(message2)) |
||
85 | elif isinstance(message2, dict) or isinstance(message2, list): |
||
86 | import pprint |
||
87 | message2 = pprint.pformat(message2, indent=1) |
||
88 | message = "%s \n%s" % (message, str(message2)) |
||
89 | else: |
||
90 | message = "%s - %s" % (message, str(message2)) |
||
91 | message = "[FORMATTED] - %s" % message |
||
92 | return message |
||
93 | |||
94 | |||
95 | def make_timestamp_human_readable(ts): |
||
96 | """ |
||
97 | Accept a unix timestamp, return a human readable timedelta string. |
||
98 | |||
99 | :param ts: a datetime, timedelta, or timestamp (integer / float) object |
||
100 | :returns: Human readable timedelta string (Str) |
||
101 | """ |
||
102 | units = ("year", "day", "hour", "minute", "second", "millisecond", "microsecond") |
||
103 | precision = 1 |
||
104 | past_tense = "{} ago" |
||
105 | future_tense = "in {}" |
||
106 | |||
107 | # Get datetime from ts string |
||
108 | dt = datetime.datetime.fromtimestamp(ts) |
||
109 | delta = datetime.datetime.now(tz=dt.tzinfo) - dt |
||
110 | |||
111 | # Determine if this is past or future tense |
||
112 | the_tense = future_tense if delta < datetime.timedelta(0) else past_tense |
||
113 | |||
114 | # Create a dictionary of units |
||
115 | delta = abs(delta) |
||
116 | d = { |
||
117 | "year": int(delta.days / 365), |
||
118 | "day": int(delta.days % 365), |
||
119 | "hour": int(delta.seconds / 3600), |
||
120 | "minute": int(delta.seconds / 60) % 60, |
||
121 | "second": delta.seconds % 60, |
||
122 | "millisecond": delta.microseconds / 1000, |
||
123 | "microsecond": delta.microseconds % 1000, |
||
124 | } |
||
125 | |||
126 | human_readable_list = [] |
||
127 | count = 0 |
||
128 | |||
129 | # Start building up the output in the human readable list. |
||
130 | for unit in units: |
||
131 | if count >= precision: |
||
132 | break # met precision |
||
133 | if d[unit] == 0: |
||
134 | continue # skip 0's |
||
135 | else: |
||
136 | s = "" if d[unit] == 1 else "s" # handle plurals |
||
137 | human_readable_list.append("{} {}{}".format(d[unit], unit, s)) |
||
138 | count += 1 |
||
139 | |||
140 | return the_tense.format(", ".join(human_readable_list)) |
||
141 | |||
142 | |||
143 | def ensure_dir(file_path): |
||
144 | directory = os.path.dirname(file_path) |
||
145 | if not os.path.exists(directory): |
||
146 | os.makedirs(directory) |
||
147 | |||
148 | |||
149 | def time_string_to_seconds(time_string): |
||
150 | pt = datetime.datetime.strptime(time_string, '%H:%M:%S.%f') |
||
151 | return pt.second + pt.minute * 60 + pt.hour * 3600 |
||
152 | |||
153 | |||
154 | def tail(f, n, offset=0): |
||
155 | """Reads a n lines from f with an offset of offset lines.""" |
||
156 | avg_line_length = 153 |
||
157 | to_read = n + offset |
||
158 | while 1: |
||
159 | try: |
||
160 | f.seek(-(avg_line_length * to_read), 2) |
||
161 | while f.read(1) != b'\n': |
||
162 | f.seek(-2, os.SEEK_CUR) |
||
163 | except IOError: |
||
164 | f.seek(0) |
||
165 | pos = f.tell() |
||
166 | lines = f.read().splitlines() |
||
167 | if len(lines) >= to_read or pos == 0: |
||
168 | return lines |
||
169 | avg_line_length *= 1.3 |
||
170 | |||
171 | |||
172 | def touch(fname, mode=0o666, dir_fd=None, **kwargs): |
||
173 | """Touch a file. If it does not exist, create it.""" |
||
174 | flags = os.O_CREAT | os.O_APPEND |
||
175 | with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f: |
||
176 | os.utime(f.fileno() if os.utime in os.supports_fd else fname, |
||
177 | dir_fd=None if os.supports_fd else dir_fd, **kwargs) |
||
178 | |||
179 | |||
180 | def clean_files_in_cache_dir(cache_directory): |
||
181 | """This will completely wipe all contents from a directory""" |
||
182 | if os.path.exists(cache_directory): |
||
183 | for root, subFolders, files in os.walk(cache_directory): |
||
184 | root_bn = os.path.basename(root) |
||
185 | if root_bn.startswith("unmanic_file_conversion-"): |
||
186 | try: |
||
187 | print("Clearing cache path - {}".format(root)) |
||
188 | shutil.rmtree(root) |
||
189 | except Exception as e: |
||
190 | print("Exception while clearing cache path - {}".format(str(e))) |
||
191 | elif root_bn.startswith("unmanic_remote_pending_library-"): |
||
192 | try: |
||
193 | print("Clearing remote library cache path - {}".format(root)) |
||
194 | shutil.rmtree(root) |
||
195 | except Exception as e: |
||
196 | print("Exception while clearing remote library cache path - {}".format(str(e))) |
||
197 | |||
198 | |||
199 | def random_string(string_length=5): |
||
200 | """Generate a random string of fixed length""" |
||
201 | letters = string.ascii_lowercase |
||
202 | return ''.join(random.choice(letters) for i in range(string_length)) |
||
203 | |||
204 | |||
205 | def json_dump_to_file(json_data, out_file, check=True, rollback_on_fail=True): |
||
206 | """Dump json data to a file. Optionally checks that the output json data is valid""" |
||
207 | import json |
||
208 | import time |
||
209 | import tempfile |
||
210 | import shutil |
||
211 | |||
212 | result = { |
||
213 | 'errors': [], |
||
214 | 'success': False |
||
215 | } |
||
216 | |||
217 | # If check param is flagged and there already exists a out file, create a temporary backup |
||
218 | |||
219 | if rollback_on_fail and os.path.exists(out_file): |
||
220 | temp_dir = tempfile.gettempdir() |
||
221 | temp_path = os.path.join(temp_dir, 'json_dump_to_file_backup-{}'.format(time.time())) |
||
222 | try: |
||
223 | shutil.copy2(out_file, temp_path) |
||
224 | result['temp_path'] = temp_path |
||
225 | except Exception as e: |
||
226 | result['success'] = False |
||
227 | result['errors'].append("Failed to create temporary file - {}".format(str(e))) |
||
228 | |||
229 | # Write data to out_file |
||
230 | try: |
||
231 | with open(out_file, 'w') as outfile: |
||
232 | json.dump(json_data, outfile, sort_keys=True, indent=4) |
||
233 | result['success'] = True |
||
234 | except Exception as e: |
||
235 | result['success'] = False |
||
236 | result['errors'].append("Exception in writing to file: {}".format(str(e))) |
||
237 | |||
238 | # If check param is flagged, ensure json data exists in the output file |
||
239 | if check: |
||
240 | try: |
||
241 | with open(out_file) as infile: |
||
242 | data = json.load(infile) |
||
243 | except Exception as e: |
||
244 | result['success'] = False |
||
245 | result['errors'].append("JSON file invalid - {}".format(e)) |
||
246 | |||
247 | # If data save was unsuccessful and the rollback_on_fail param is flagged |
||
248 | # and there is a temp file set, roll back to old file |
||
249 | if not result.get('success') and result.get('temp_path') and rollback_on_fail: |
||
250 | try: |
||
251 | os.remove(out_file) |
||
252 | shutil.copy2(result.get('temp_path'), out_file) |
||
253 | os.remove(result.get('temp_path')) |
||
254 | except Exception as e: |
||
255 | result['success'] = False |
||
256 | result['errors'].append("Exception while restoring original file file: {}".format(str(e))) |
||
257 | |||
258 | return result |
||
259 | |||
260 | |||
261 | def extract_video_codecs_from_file_properties(file_properties: dict): |
||
262 | """ |
||
263 | Read a dictionary of file properties |
||
264 | Extract a list of video codecs from the video streams |
||
265 | |||
266 | :param file_properties: |
||
267 | :return: |
||
268 | """ |
||
269 | codecs = [] |
||
270 | for stream in file_properties['streams']: |
||
271 | if stream['codec_type'] == 'video': |
||
272 | codecs.append(stream['codec_name']) |
||
273 | return codecs |
||
274 | |||
275 | |||
276 | def get_file_checksum(path): |
||
277 | """ |
||
278 | Read a checksum of a file. |
||
279 | |||
280 | Rather than opening the whole file in memory, open it in chunks. |
||
281 | This is slightly slower, but allows working on systems with limited memory. |
||
282 | |||
283 | :param path: |
||
284 | :return: |
||
285 | """ |
||
286 | file_hash = hashlib.md5() |
||
287 | with open(path, "rb") as f: |
||
288 | for chunk in iter(lambda: f.read(8192), b''): |
||
289 | file_hash.update(chunk) |
||
290 | return copy.copy(file_hash.hexdigest()) |