summaryrefslogtreecommitdiff
path: root/poky/meta/lib/oeqa/core/utils/concurrencytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'poky/meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r--poky/meta/lib/oeqa/core/utils/concurrencytest.py53
1 files changed, 27 insertions, 26 deletions
diff --git a/poky/meta/lib/oeqa/core/utils/concurrencytest.py b/poky/meta/lib/oeqa/core/utils/concurrencytest.py
index b2eb68fb0..161a2f6e9 100644
--- a/poky/meta/lib/oeqa/core/utils/concurrencytest.py
+++ b/poky/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -48,11 +48,15 @@ _all__ = [
#
class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
- def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
+ def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult):
super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
self.threadnum = threadnum
self.totalinprocess = totalinprocess
self.totaltests = totaltests
+ self.buffer = True
+ self.outputbuf = output
+ self.finalresult = finalresult
+ self.finalresult.buffer = True
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
self.semaphore.acquire()
@@ -71,6 +75,8 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
test.id())
finally:
self.semaphore.release()
+ self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8"))
+ self.finalresult._stdout_buffer = io.StringIO()
super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
class ProxyTestResult:
@@ -190,28 +196,20 @@ class ConcurrentTestSuite(unittest.TestSuite):
self.removefunc = removefunc
def run(self, result):
- tests, totaltests = fork_for_tests(self.processes, self)
+ testservers, totaltests = fork_for_tests(self.processes, self)
try:
threads = {}
queue = Queue()
semaphore = threading.Semaphore(1)
result.threadprogress = {}
- for i, (test, testnum) in enumerate(tests):
+ for i, (testserver, testnum, output) in enumerate(testservers):
result.threadprogress[i] = []
process_result = BBThreadsafeForwardingResult(
ExtraResultsDecoderTestResult(result),
- semaphore, i, testnum, totaltests)
- # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
- # as per default in parent code
- process_result.buffer = True
- # We have to add a buffer object to stdout to keep subunit happy
- process_result._stderr_buffer = io.StringIO()
- process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
- process_result._stdout_buffer = io.StringIO()
- process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
+ semaphore, i, testnum, totaltests, output, result)
reader_thread = threading.Thread(
- target=self._run_test, args=(test, process_result, queue))
- threads[test] = reader_thread, process_result
+ target=self._run_test, args=(testserver, process_result, queue))
+ threads[testserver] = reader_thread, process_result
reader_thread.start()
while threads:
finished_test = queue.get()
@@ -222,13 +220,13 @@ class ConcurrentTestSuite(unittest.TestSuite):
process_result.stop()
raise
finally:
- for test in tests:
- test[0]._stream.close()
+ for testserver in testservers:
+ testserver[0]._stream.close()
- def _run_test(self, test, process_result, queue):
+ def _run_test(self, testserver, process_result, queue):
try:
try:
- test.run(process_result)
+ testserver.run(process_result)
except Exception:
# The run logic itself failed
case = testtools.ErrorHolder(
@@ -236,10 +234,10 @@ class ConcurrentTestSuite(unittest.TestSuite):
error=sys.exc_info())
case.run(process_result)
finally:
- queue.put(test)
+ queue.put(testserver)
def fork_for_tests(concurrency_num, suite):
- result = []
+ testservers = []
if 'BUILDDIR' in os.environ:
selftestdir = get_test_layer()
@@ -273,10 +271,11 @@ def fork_for_tests(concurrency_num, suite):
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
+ # Send stdout/stderr over the stream
+ os.dup2(c2pwrite, sys.stdout.fileno())
+ os.dup2(c2pwrite, sys.stderr.fileno())
+
subunit_client = TestProtocolClient(stream)
- # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
- # as per default in parent code
- subunit_client.buffer = True
subunit_result = AutoTimingTestResultDecorator(subunit_client)
unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
if ourpid != os.getpid():
@@ -306,9 +305,11 @@ def fork_for_tests(concurrency_num, suite):
else:
os.close(c2pwrite)
stream = os.fdopen(c2pread, 'rb', 1)
- test = ProtocolTestCase(stream)
- result.append((test, numtests))
- return result, totaltests
+ # Collect stdout/stderr into an io buffer
+ output = io.BytesIO()
+ testserver = ProtocolTestCase(stream, passthrough=output)
+ testservers.append((testserver, numtests, output))
+ return testservers, totaltests
def partition_tests(suite, count):
# Keep tests from the same class together but allow tests from modules