16 import multiprocessing
18 multiprocessing = None
23 # Test Execution Implementation
25 class LockedValue(object):
26 def __init__(self, value):
27 self.lock = threading.Lock()
37 def _set_value(self, value):
44 value = property(_get_value, _set_value)
46 class TestProvider(object):
47 def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
48 self.canceled_flag = canceled_flag
50 # Create a shared queue to provide the test indices.
51 self.queue = queue_impl()
52 for i in range(len(tests)):
54 for i in range(num_jobs):
58 self.canceled_flag.value = 1
61 # Check if we are canceled.
62 if self.canceled_flag.value:
65 # Otherwise take the next test.
66 return self.queue.get()
69 def __init__(self, run_instance, provider, consumer):
70 self.run_instance = run_instance
71 self.provider = provider
72 self.consumer = consumer
76 item = self.provider.get()
80 self.consumer.task_finished()
82 def run_test(self, test_index):
83 test = self.run_instance.tests[test_index]
85 self.run_instance.execute_test(test)
86 except KeyboardInterrupt:
87 # This is a sad hack. Unfortunately subprocess goes
88 # bonkers with ctrl-c and we start forking merrily.
89 print('\nCtrl-C detected, goodbye.')
91 self.consumer.update(test_index, test)
93 class ThreadResultsConsumer(object):
94 def __init__(self, display):
95 self.display = display
96 self.lock = threading.Lock()
98 def update(self, test_index, test):
101 self.display.update(test)
105 def task_finished(self):
108 def handle_results(self):
111 class MultiprocessResultsConsumer(object):
112 def __init__(self, run, display, num_jobs):
114 self.display = display
115 self.num_jobs = num_jobs
116 self.queue = multiprocessing.Queue()
118 def update(self, test_index, test):
119 # This method is called in the child processes, and communicates the
120 # results to the actual display implementation via an output queue.
121 self.queue.put((test_index, test.result))
123 def task_finished(self):
124 # This method is called in the child processes, and communicates that
125 # individual tasks are complete.
128 def handle_results(self):
129 # This method is called in the parent, and consumes the results from the
130 # output queue and dispatches to the actual display. The method will
131 # complete after each of num_jobs tasks has signalled completion.
133 while completed != self.num_jobs:
134 # Wait for a result item.
135 item = self.queue.get()
140 # Update the test result in the parent process.
142 test = self.run.tests[index]
145 self.display.update(test)
147 def run_one_tester(run, provider, display):
148 tester = Tester(run, provider, display)
155 This class represents a concrete, configured testing run.
158 def __init__(self, lit_config, tests):
159 self.lit_config = lit_config
162 def execute_test(self, test):
164 start_time = time.time()
166 result = test.config.test_format.execute(test, self.lit_config)
168 # Support deprecated result from execute() which returned the result
169 # code and additional output as a tuple.
170 if isinstance(result, tuple):
171 code, output = result
172 result = lit.Test.Result(code, output)
173 elif not isinstance(result, lit.Test.Result):
174 raise ValueError("unexpected result from test execution")
175 except KeyboardInterrupt:
178 if self.lit_config.debug:
180 output = 'Exception during script execution:\n'
181 output += traceback.format_exc()
183 result = lit.Test.Result(lit.Test.UNRESOLVED, output)
184 result.elapsed = time.time() - start_time
186 test.setResult(result)
188 def execute_tests(self, display, jobs, max_time=None,
189 use_processes=False):
191 execute_tests(display, jobs, [max_time])
193 Execute each of the tests in the run, using up to jobs number of
194 parallel tasks, and inform the display of each individual result. The
195 provided tests should be a subset of the tests available in this run
198 If max_time is non-None, it should be a time in seconds after which to
199 stop executing tests.
201 The display object will have its update method called with each test as
202 it is completed. The calls are guaranteed to be locked with respect to
203 one another, but are *not* guaranteed to be called on the same thread as
204 this method was invoked on.
206 Upon completion, each test in the run will have its result
207 computed. Tests which were not actually executed (for any reason) will
208 be given an UNRESOLVED result.
211 # Choose the appropriate parallel execution implementation.
213 if jobs != 1 and use_processes and multiprocessing:
215 task_impl = multiprocessing.Process
216 queue_impl = multiprocessing.Queue
217 canceled_flag = multiprocessing.Value('i', 0)
218 consumer = MultiprocessResultsConsumer(self, display, jobs)
220 # multiprocessing fails to initialize with certain OpenBSD and
221 # FreeBSD Python versions: http://bugs.python.org/issue3770
222 # Unfortunately the error raised also varies by platform.
223 self.lit_config.note('failed to initialize multiprocessing')
226 task_impl = threading.Thread
227 queue_impl = queue.Queue
228 canceled_flag = LockedValue(0)
229 consumer = ThreadResultsConsumer(display)
231 # Create the test provider.
232 provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
234 # Install a console-control signal handler on Windows.
235 if win32api is not None:
236 def console_ctrl_handler(type):
239 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
241 # Install a timeout handler, if requested.
242 if max_time is not None:
243 def timeout_handler():
245 timeout_timer = threading.Timer(max_time, timeout_handler)
246 timeout_timer.start()
248 # If not using multiple tasks, just run the tests directly.
250 run_one_tester(self, provider, consumer)
252 # Otherwise, execute the tests in parallel
253 self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
255 # Cancel the timeout handler.
256 if max_time is not None:
257 timeout_timer.cancel()
259 # Update results for any tests which weren't run.
260 for test in self.tests:
261 if test.result is None:
262 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
264 def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
265 # Start all of the tasks.
266 tasks = [task_impl(target=run_one_tester,
267 args=(self, provider, consumer))
268 for i in range(jobs)]
272 # Allow the consumer to handle results, if necessary.
273 consumer.handle_results()
275 # Wait for all the tasks to complete.