kapsikkum-unmanic – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3  
4 """
5 unmanic.test_taskhandler.py
6  
7 Written by: Josh.5 <jsunnex@gmail.com>
8 Date: 08 May 2020, (12:28 PM)
9  
10 Copyright:
11 Copyright (C) Josh Sunnex - All Rights Reserved
12  
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19  
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22  
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
26 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
27 DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
28 OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
29 OR OTHER DEALINGS IN THE SOFTWARE.
30  
31 """
32 import os
33 import pytest
34 import time
35 import tempfile
36 import threading
37  
38 from tests.support_.test_data import data_queues, mock_jobqueue_class
39 from unmanic.libs.taskhandler import TaskHandler
40 from unmanic.libs.unmodels.tasks import Tasks
41  
42  
43 class TestClass(object):
44 """
45 TestClass
46  
47 Test the TaskHandler object
48  
49 """
50  
51 db_connection = None
52  
53 def setup_class(self):
54 """
55 Setup the class state for pytest
56  
57 :return:
58 """
59 self.project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
60 self.data_queues = data_queues.data_queues
61 self.scheduledtasks = self.data_queues["scheduledtasks"]
62 self.inotifytasks = self.data_queues["inotifytasks"]
63 self.progress_reports = self.data_queues["progress_reports"]
64 self.task_queue = mock_jobqueue_class.MockJobQueue()
65 self.event = None
66 self.task_handler = None
67  
68 # Create temp config path
69 config_path = tempfile.mkdtemp(prefix='unmanic_tests_')
70  
71 # Create connection to a test DB
72 from unmanic.libs import unmodels
73 app_dir = os.path.dirname(os.path.abspath(__file__))
74 database_settings = {
75 "TYPE": "SQLITE",
76 "FILE": ':memory:',
77 "MIGRATIONS_DIR": os.path.join(app_dir, 'migrations'),
78 }
79 from unmanic.libs.unmodels.lib import Database
80 self.db_connection = Database.select_database(database_settings)
81  
82 # Create required tables
83 self.db_connection.create_tables([Tasks])
84  
85 # import config
86 from unmanic import config
87 self.settings = config.Config(config_path=config_path)
88 self.settings.set_config_item('debugging', True, save_settings=False)
89  
90 def teardown_class(self):
91 """
92 Teardown any state that was previously setup with a call to
93 setup_class.
94  
95 :return:
96 """
97 pass
98  
99 def setup_method(self):
100 """
101 Setup any state tied to the execution of the given method in a
102 class.
103 setup_method is invoked for every test method of a class.
104  
105 :return:
106 """
107 self.event = threading.Event()
108 self.task_handler = TaskHandler(self.data_queues, self.task_queue, self.event)
109 self.task_handler.daemon = True
110 self.task_handler.start()
111 self.task_queue.added_item = None
112  
113 def teardown_method(self):
114 """
115 Teardown any state that was previously setup with a setup_method
116 call.
117  
118 :return:
119 """
120 self.task_handler.stop()
121 self.task_handler.join()
122  
123 @pytest.mark.integrationtest
124 def test_task_handler_runs_as_a_thread(self):
125 assert self.task_handler.is_alive()
126  
127 @pytest.mark.integrationtest
128 def test_task_handler_thread_can_stop_in_less_than_two_seconds(self):
129 self.event.set()
130 self.task_handler.stop()
131 time.sleep(2)
132 assert not self.task_handler.is_alive()
133  
134 @pytest.mark.integrationtest
135 @pytest.mark.skip(reason="This test needs to be re-written to work with the TaskHandler DB integration")
136 def test_task_handler_can_process_scheduled_tasks_queue(self):
137 test_path_string = 'scheduledtasks'
138 self.scheduledtasks.put(test_path_string)
139 self.task_handler.process_scheduledtasks_queue()
140 assert (test_path_string == self.task_queue.added_item)
141  
142 @pytest.mark.integrationtest
143 @pytest.mark.skip(reason="This test needs to be re-written to work with the TaskHandler DB integration")
144 def test_task_handler_can_process_inotify_tasks_queue(self):
145 test_path_string = '/tests/support_/videos/small/big_buck_bunny_144p_1mb.3gp'
146 self.inotifytasks.put(test_path_string)
147 self.task_handler.process_inotifytasks_queue()
148 assert (test_path_string == self.task_queue.added_item)
149  
150  
151 if __name__ == '__main__':
152 pytest.main(['-s', '--log-cli-level=INFO', __file__])