kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.common.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 06 Dec 2018, (7:21 AM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import copy
33 import datetime
34 import hashlib
35 import os
36 import random
37 import string
38 import shutil
39  
40  
41 def get_home_dir():
42 # Attempt to get the HOME_DIR environment variable
43 home_dir = os.environ.get('HOME_DIR')
44 # If HOME_DIR is unset, empty, or specifically set to use the home directory
45 if not home_dir:
46 # Expand the tilde to the user's home directory
47 home_dir = os.path.expanduser("~")
48 else:
49 # For any value of HOME_DIR, ensure tilde and relative paths are correctly handled
50 # os.path.expanduser will handle tilde but won't affect absolute paths
51 # os.path.abspath will convert a relative path to an absolute path
52 home_dir = os.path.abspath(os.path.expanduser(home_dir))
53 return home_dir
54  
55  
56 def get_default_root_path():
57 root = os.path.join(os.sep)
58 if os.name == "nt":
59 root = os.path.join('c:', os.sep)
60 return root
61  
62  
63 def get_default_library_path():
64 library_path = os.path.join(get_default_root_path(), 'library')
65 # Windows set the default library directory into %USERPROFILE%\Documents
66 if os.name == "nt":
67 library_path = os.path.join(os.path.expandvars(r'%USERPROFILE%'), 'Documents')
68 return library_path
69  
70  
71 def get_default_cache_path():
72 cache_path = os.path.join(get_default_root_path(), 'tmp', 'unmanic')
73 # Windows set the default temp directory into %LOCALAPPDATA%\Temp\Unmanic
74 if os.name == "nt":
75 cache_path = os.path.join(os.path.expandvars(r'%LOCALAPPDATA%\Temp'), 'Unmanic')
76 return cache_path
77  
78  
79 def format_message(message, message2=''):
80 message = str(message)
81 if message2:
82 # Message2 can support other objects:
83 if isinstance(message2, str):
84 message = "%s - %s" % (message, str(message2))
85 elif isinstance(message2, dict) or isinstance(message2, list):
86 import pprint
87 message2 = pprint.pformat(message2, indent=1)
88 message = "%s \n%s" % (message, str(message2))
89 else:
90 message = "%s - %s" % (message, str(message2))
91 message = "[FORMATTED] - %s" % message
92 return message
93  
94  
95 def make_timestamp_human_readable(ts):
96 """
97 Accept a unix timestamp, return a human readable timedelta string.
98  
99 :param ts: a datetime, timedelta, or timestamp (integer / float) object
100 :returns: Human readable timedelta string (Str)
101 """
102 units = ("year", "day", "hour", "minute", "second", "millisecond", "microsecond")
103 precision = 1
104 past_tense = "{} ago"
105 future_tense = "in {}"
106  
107 # Get datetime from ts string
108 dt = datetime.datetime.fromtimestamp(ts)
109 delta = datetime.datetime.now(tz=dt.tzinfo) - dt
110  
111 # Determine if this is past or future tense
112 the_tense = future_tense if delta < datetime.timedelta(0) else past_tense
113  
114 # Create a dictionary of units
115 delta = abs(delta)
116 d = {
117 "year": int(delta.days / 365),
118 "day": int(delta.days % 365),
119 "hour": int(delta.seconds / 3600),
120 "minute": int(delta.seconds / 60) % 60,
121 "second": delta.seconds % 60,
122 "millisecond": delta.microseconds / 1000,
123 "microsecond": delta.microseconds % 1000,
124 }
125  
126 human_readable_list = []
127 count = 0
128  
129 # Start building up the output in the human readable list.
130 for unit in units:
131 if count >= precision:
132 break # met precision
133 if d[unit] == 0:
134 continue # skip 0's
135 else:
136 s = "" if d[unit] == 1 else "s" # handle plurals
137 human_readable_list.append("{} {}{}".format(d[unit], unit, s))
138 count += 1
139  
140 return the_tense.format(", ".join(human_readable_list))
141  
142  
143 def ensure_dir(file_path):
144 directory = os.path.dirname(file_path)
145 if not os.path.exists(directory):
146 os.makedirs(directory)
147  
148  
149 def time_string_to_seconds(time_string):
150 pt = datetime.datetime.strptime(time_string, '%H:%M:%S.%f')
151 return pt.second + pt.minute * 60 + pt.hour * 3600
152  
153  
154 def tail(f, n, offset=0):
155 """Reads a n lines from f with an offset of offset lines."""
156 avg_line_length = 153
157 to_read = n + offset
158 while 1:
159 try:
160 f.seek(-(avg_line_length * to_read), 2)
161 while f.read(1) != b'\n':
162 f.seek(-2, os.SEEK_CUR)
163 except IOError:
164 f.seek(0)
165 pos = f.tell()
166 lines = f.read().splitlines()
167 if len(lines) >= to_read or pos == 0:
168 return lines
169 avg_line_length *= 1.3
170  
171  
172 def touch(fname, mode=0o666, dir_fd=None, **kwargs):
173 """Touch a file. If it does not exist, create it."""
174 flags = os.O_CREAT | os.O_APPEND
175 with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f:
176 os.utime(f.fileno() if os.utime in os.supports_fd else fname,
177 dir_fd=None if os.supports_fd else dir_fd, **kwargs)
178  
179  
180 def clean_files_in_cache_dir(cache_directory):
181 """This will completely wipe all contents from a directory"""
182 if os.path.exists(cache_directory):
183 for root, subFolders, files in os.walk(cache_directory):
184 root_bn = os.path.basename(root)
185 if root_bn.startswith("unmanic_file_conversion-"):
186 try:
187 print("Clearing cache path - {}".format(root))
188 shutil.rmtree(root)
189 except Exception as e:
190 print("Exception while clearing cache path - {}".format(str(e)))
191 elif root_bn.startswith("unmanic_remote_pending_library-"):
192 try:
193 print("Clearing remote library cache path - {}".format(root))
194 shutil.rmtree(root)
195 except Exception as e:
196 print("Exception while clearing remote library cache path - {}".format(str(e)))
197  
198  
199 def random_string(string_length=5):
200 """Generate a random string of fixed length"""
201 letters = string.ascii_lowercase
202 return ''.join(random.choice(letters) for i in range(string_length))
203  
204  
205 def json_dump_to_file(json_data, out_file, check=True, rollback_on_fail=True):
206 """Dump json data to a file. Optionally checks that the output json data is valid"""
207 import json
208 import time
209 import tempfile
210 import shutil
211  
212 result = {
213 'errors': [],
214 'success': False
215 }
216  
217 # If check param is flagged and there already exists a out file, create a temporary backup
218  
219 if rollback_on_fail and os.path.exists(out_file):
220 temp_dir = tempfile.gettempdir()
221 temp_path = os.path.join(temp_dir, 'json_dump_to_file_backup-{}'.format(time.time()))
222 try:
223 shutil.copy2(out_file, temp_path)
224 result['temp_path'] = temp_path
225 except Exception as e:
226 result['success'] = False
227 result['errors'].append("Failed to create temporary file - {}".format(str(e)))
228  
229 # Write data to out_file
230 try:
231 with open(out_file, 'w') as outfile:
232 json.dump(json_data, outfile, sort_keys=True, indent=4)
233 result['success'] = True
234 except Exception as e:
235 result['success'] = False
236 result['errors'].append("Exception in writing to file: {}".format(str(e)))
237  
238 # If check param is flagged, ensure json data exists in the output file
239 if check:
240 try:
241 with open(out_file) as infile:
242 data = json.load(infile)
243 except Exception as e:
244 result['success'] = False
245 result['errors'].append("JSON file invalid - {}".format(e))
246  
247 # If data save was unsuccessful and the rollback_on_fail param is flagged
248 # and there is a temp file set, roll back to old file
249 if not result.get('success') and result.get('temp_path') and rollback_on_fail:
250 try:
251 os.remove(out_file)
252 shutil.copy2(result.get('temp_path'), out_file)
253 os.remove(result.get('temp_path'))
254 except Exception as e:
255 result['success'] = False
256 result['errors'].append("Exception while restoring original file file: {}".format(str(e)))
257  
258 return result
259  
260  
261 def extract_video_codecs_from_file_properties(file_properties: dict):
262 """
263 Read a dictionary of file properties
264 Extract a list of video codecs from the video streams
265  
266 :param file_properties:
267 :return:
268 """
269 codecs = []
270 for stream in file_properties['streams']:
271 if stream['codec_type'] == 'video':
272 codecs.append(stream['codec_name'])
273 return codecs
274  
275  
276 def get_file_checksum(path):
277 """
278 Read a checksum of a file.
279  
280 Rather than opening the whole file in memory, open it in chunks.
281 This is slightly slower, but allows working on systems with limited memory.
282  
283 :param path:
284 :return:
285 """
286 file_hash = hashlib.md5()
287 with open(path, "rb") as f:
288 for chunk in iter(lambda: f.read(8192), b''):
289 file_hash.update(chunk)
290 return copy.copy(file_hash.hexdigest())