From: Daniel Dunbar Date: Thu, 29 Aug 2013 00:54:23 +0000 (+0000) Subject: [lit] Add support for multiprocessing, under --use-processes for now. X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=4ac723b53f2eb69e604891853ca87d1e2b3ee788;p=oota-llvm.git [lit] Add support for multiprocessing, under --use-processes for now. git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@189556 91177308-0d34-0410-b5e6-96231b3b80d8 --- diff --git a/utils/lit/TODO b/utils/lit/TODO index e6aeb3d9339..c1a60c6f4f0 100644 --- a/utils/lit/TODO +++ b/utils/lit/TODO @@ -113,8 +113,8 @@ Infrastructure module. This is currently blocked on: * The external execution mode is faster in some situations, because it avoids - being bottlenecked on the GIL. We could fix this by moving to a good - multiprocessing model. + being bottlenecked on the GIL. This can hopefully be obviated simply by + using --use-processes. * Some tests in LLVM/Clang are explicitly disabled with the internal shell (because they use features specific to bash). We would need to rewrite these @@ -158,8 +158,6 @@ Miscellaneous * Add --show-unsupported, don't show by default? -* Optionally use multiprocessing. - * Support valgrind in all configs, and LLVM style valgrind. * Support a timeout / ulimit. diff --git a/utils/lit/lit/main.py b/utils/lit/lit/main.py index 5c40f1ca533..50c9a66c8d3 100755 --- a/utils/lit/lit/main.py +++ b/utils/lit/lit/main.py @@ -142,6 +142,12 @@ def main(builtinParameters = {}): group.add_option("", "--show-tests", dest="showTests", help="Show all discovered tests", action="store_true", default=False) + group.add_option("", "--use-processes", dest="useProcesses", + help="Run tests in parallel with processes (not threads)", + action="store_true", default=False) + group.add_option("", "--use-threads", dest="useProcesses", + help="Run tests in parallel with threads (not processes)", + action="store_false", default=False) parser.add_option_group(group) (opts, args) = parser.parse_args() @@ -264,7 +270,8 @@ def main(builtinParameters = {}): startTime = time.time() display = TestingProgressDisplay(opts, len(run.tests), progressBar) try: - run.execute_tests(display, opts.numThreads, opts.maxTime) + run.execute_tests(display, opts.numThreads, opts.maxTime, + opts.useProcesses) except KeyboardInterrupt: sys.exit(2) display.finish() diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py index 617c3b988f0..8642ff18927 100644 --- a/utils/lit/lit/run.py +++ b/utils/lit/lit/run.py @@ -2,42 +2,68 @@ import os import threading import time import traceback +try: + import Queue as queue +except ImportError: + import queue try: import win32api except ImportError: win32api = None +try: + import multiprocessing +except ImportError: + multiprocessing = None + import lit.Test ### # Test Execution Implementation -class TestProvider(object): - def __init__(self, tests): - self.iter = iter(range(len(tests))) +class LockedValue(object): + def __init__(self, value): self.lock = threading.Lock() - self.canceled = False + self._value = value - def cancel(self): + def _get_value(self): self.lock.acquire() - self.canceled = True - self.lock.release() + try: + return self._value + finally: + self.lock.release() - def get(self): - # Check if we are cancelled. + def _set_value(self, value): self.lock.acquire() - if self.canceled: - self.lock.release() + try: + self._value = value + finally: + self.lock.release() + + value = property(_get_value, _set_value) + +class TestProvider(object): + def __init__(self, tests, num_jobs, queue_impl, canceled_flag): + self.canceled_flag = canceled_flag + + # Create a shared queue to provide the test indices. + self.queue = queue_impl() + for i in range(len(tests)): + self.queue.put(i) + for i in range(num_jobs): + self.queue.put(None) + + def cancel(self): + self.canceled_flag.value = 1 + + def get(self): + # Check if we are canceled. + if self.canceled_flag.value: return None # Otherwise take the next test. - for item in self.iter: - break - else: - item = None - self.lock.release() - return item + return self.queue.get() class Tester(object): def __init__(self, run_instance, provider, consumer): @@ -46,7 +72,7 @@ class Tester(object): self.consumer = consumer def run(self): - while 1: + while True: item = self.provider.get() if item is None: break @@ -82,6 +108,42 @@ class ThreadResultsConsumer(object): def handle_results(self): pass +class MultiprocessResultsConsumer(object): + def __init__(self, run, display, num_jobs): + self.run = run + self.display = display + self.num_jobs = num_jobs + self.queue = multiprocessing.Queue() + + def update(self, test_index, test): + # This method is called in the child processes, and communicates the + # results to the actual display implementation via an output queue. + self.queue.put((test_index, test.result)) + + def task_finished(self): + # This method is called in the child processes, and communicates that + # individual tasks are complete. + self.queue.put(None) + + def handle_results(self): + # This method is called in the parent, and consumes the results from the + # output queue and dispatches to the actual display. The method will + # complete after each of num_jobs tasks has signalled completion. + completed = 0 + while completed != self.num_jobs: + # Wait for a result item. + item = self.queue.get() + if item is None: + completed += 1 + continue + + # Update the test result in the parent process. + index,result = item + test = self.run.tests[index] + test.result = result + + self.display.update(test) + def run_one_tester(run, provider, display): tester = Tester(run, provider, display) tester.run() @@ -123,7 +185,8 @@ class Run(object): test.setResult(result) - def execute_tests(self, display, jobs, max_time=None): + def execute_tests(self, display, jobs, max_time=None, + use_processes=False): """ execute_tests(display, jobs, [max_time]) @@ -145,8 +208,20 @@ class Run(object): be given an UNRESOLVED result. """ - # Create the test provider object. - provider = TestProvider(self.tests) + # Choose the appropriate parallel execution implementation. + if jobs == 1 or not use_processes or multiprocessing is None: + task_impl = threading.Thread + queue_impl = queue.Queue + canceled_flag = LockedValue(0) + consumer = ThreadResultsConsumer(display) + else: + task_impl = multiprocessing.Process + queue_impl = multiprocessing.Queue + canceled_flag = multiprocessing.Value('i', 0) + consumer = MultiprocessResultsConsumer(self, display, jobs) + + # Create the test provider. + provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag) # Install a console-control signal handler on Windows. if win32api is not None: @@ -162,8 +237,12 @@ class Run(object): timeout_timer = threading.Timer(max_time, timeout_handler) timeout_timer.start() - # Actually execute the tests. - self._execute_tests_with_provider(provider, display, jobs) + # If not using multiple tasks, just run the tests directly. + if jobs == 1: + run_one_tester(self, provider, consumer) + else: + # Otherwise, execute the tests in parallel + self._execute_tests_in_parallel(task_impl, provider, consumer, jobs) # Cancel the timeout handler. if max_time is not None: @@ -174,18 +253,10 @@ class Run(object): if test.result is None: test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) - def _execute_tests_with_provider(self, provider, display, jobs): - consumer = ThreadResultsConsumer(display) - - # If only using one testing thread, don't use tasks at all; this lets us - # profile, among other things. - if jobs == 1: - run_one_tester(self, provider, consumer) - return - + def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): # Start all of the tasks. - tasks = [threading.Thread(target=run_one_tester, - args=(self, provider, consumer)) + tasks = [task_impl(target=run_one_tester, + args=(self, provider, consumer)) for i in range(jobs)] for t in tasks: t.start()