summaryrefslogtreecommitdiff
path: root/tools/testing/kunit/kunit_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/testing/kunit/kunit_parser.py')
-rw-r--r--tools/testing/kunit/kunit_parser.py217
1 files changed, 144 insertions, 73 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py
index e8bcc139702e..c3c524b79db8 100644
--- a/tools/testing/kunit/kunit_parser.py
+++ b/tools/testing/kunit/kunit_parser.py
@@ -43,26 +43,68 @@ class TestCase(object):
class TestStatus(Enum):
SUCCESS = auto()
FAILURE = auto()
+ SKIPPED = auto()
TEST_CRASHED = auto()
NO_TESTS = auto()
FAILURE_TO_PARSE_TESTS = auto()
+class LineStream:
+ """Provides a peek()/pop() interface over an iterator of (line#, text)."""
+ _lines: Iterator[Tuple[int, str]]
+ _next: Tuple[int, str]
+ _done: bool
+
+ def __init__(self, lines: Iterator[Tuple[int, str]]):
+ self._lines = lines
+ self._done = False
+ self._next = (0, '')
+ self._get_next()
+
+ def _get_next(self) -> None:
+ try:
+ self._next = next(self._lines)
+ except StopIteration:
+ self._done = True
+
+ def peek(self) -> str:
+ return self._next[1]
+
+ def pop(self) -> str:
+ n = self._next
+ self._get_next()
+ return n[1]
+
+ def __bool__(self) -> bool:
+ return not self._done
+
+ # Only used by kunit_tool_test.py.
+ def __iter__(self) -> Iterator[str]:
+ while bool(self):
+ yield self.pop()
+
+ def line_number(self) -> int:
+ return self._next[0]
+
kunit_start_re = re.compile(r'TAP version [0-9]+$')
kunit_end_re = re.compile('(List of all partitions:|'
- 'Kernel panic - not syncing: VFS:)')
-
-def isolate_kunit_output(kernel_output) -> Iterator[str]:
- started = False
- for line in kernel_output:
- line = line.rstrip() # line always has a trailing \n
- if kunit_start_re.search(line):
- prefix_len = len(line.split('TAP version')[0])
- started = True
- yield line[prefix_len:] if prefix_len > 0 else line
- elif kunit_end_re.search(line):
- break
- elif started:
- yield line[prefix_len:] if prefix_len > 0 else line
+ 'Kernel panic - not syncing: VFS:|reboot: System halted)')
+
+def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
+ def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
+ line_num = 0
+ started = False
+ for line in kernel_output:
+ line_num += 1
+ line = line.rstrip() # line always has a trailing \n
+ if kunit_start_re.search(line):
+ prefix_len = len(line.split('TAP version')[0])
+ started = True
+ yield line_num, line[prefix_len:]
+ elif kunit_end_re.search(line):
+ break
+ elif started:
+ yield line_num, line[prefix_len:]
+ return LineStream(lines=isolate_kunit_output(kernel_output))
def raw_output(kernel_output) -> None:
for line in kernel_output:
@@ -97,34 +139,40 @@ def print_log(log) -> None:
TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
-def consume_non_diagnostic(lines: List[str]) -> None:
- while lines and not TAP_ENTRIES.match(lines[0]):
- lines.pop(0)
+def consume_non_diagnostic(lines: LineStream) -> None:
+ while lines and not TAP_ENTRIES.match(lines.peek()):
+ lines.pop()
-def save_non_diagnostic(lines: List[str], test_case: TestCase) -> None:
- while lines and not TAP_ENTRIES.match(lines[0]):
- test_case.log.append(lines[0])
- lines.pop(0)
+def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
+ while lines and not TAP_ENTRIES.match(lines.peek()):
+ test_case.log.append(lines.peek())
+ lines.pop()
OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
+OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
+
OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
-def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
+def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
save_non_diagnostic(lines, test_case)
if not lines:
test_case.status = TestStatus.TEST_CRASHED
return True
- line = lines[0]
+ line = lines.peek()
match = OK_NOT_OK_SUBTEST.match(line)
while not match and lines:
- line = lines.pop(0)
+ line = lines.pop()
match = OK_NOT_OK_SUBTEST.match(line)
if match:
- test_case.log.append(lines.pop(0))
+ test_case.log.append(lines.pop())
test_case.name = match.group(2)
+ skip_match = OK_NOT_OK_SKIP.match(line)
+ if skip_match:
+ test_case.status = TestStatus.SKIPPED
+ return True
if test_case.status == TestStatus.TEST_CRASHED:
return True
if match.group(1) == 'ok':
@@ -138,14 +186,14 @@ def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
-def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
+def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
save_non_diagnostic(lines, test_case)
if not lines:
return False
- line = lines[0]
+ line = lines.peek()
match = SUBTEST_DIAGNOSTIC.match(line)
if match:
- test_case.log.append(lines.pop(0))
+ test_case.log.append(lines.pop())
crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
if crash_match:
test_case.status = TestStatus.TEST_CRASHED
@@ -153,7 +201,7 @@ def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
else:
return False
-def parse_test_case(lines: List[str]) -> Optional[TestCase]:
+def parse_test_case(lines: LineStream) -> Optional[TestCase]:
test_case = TestCase()
save_non_diagnostic(lines, test_case)
while parse_diagnostic(lines, test_case):
@@ -165,55 +213,58 @@ def parse_test_case(lines: List[str]) -> Optional[TestCase]:
SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
-def parse_subtest_header(lines: List[str]) -> Optional[str]:
+def parse_subtest_header(lines: LineStream) -> Optional[str]:
consume_non_diagnostic(lines)
if not lines:
return None
- match = SUBTEST_HEADER.match(lines[0])
+ match = SUBTEST_HEADER.match(lines.peek())
if match:
- lines.pop(0)
+ lines.pop()
return match.group(1)
else:
return None
SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
-def parse_subtest_plan(lines: List[str]) -> Optional[int]:
+def parse_subtest_plan(lines: LineStream) -> Optional[int]:
consume_non_diagnostic(lines)
- match = SUBTEST_PLAN.match(lines[0])
+ match = SUBTEST_PLAN.match(lines.peek())
if match:
- lines.pop(0)
+ lines.pop()
return int(match.group(1))
else:
return None
def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
- if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
+ if left == right:
+ return left
+ elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
return TestStatus.TEST_CRASHED
elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
return TestStatus.FAILURE
- elif left != TestStatus.SUCCESS:
- return left
- elif right != TestStatus.SUCCESS:
+ elif left == TestStatus.SKIPPED:
return right
else:
- return TestStatus.SUCCESS
+ return left
-def parse_ok_not_ok_test_suite(lines: List[str],
+def parse_ok_not_ok_test_suite(lines: LineStream,
test_suite: TestSuite,
expected_suite_index: int) -> bool:
consume_non_diagnostic(lines)
if not lines:
test_suite.status = TestStatus.TEST_CRASHED
return False
- line = lines[0]
+ line = lines.peek()
match = OK_NOT_OK_MODULE.match(line)
if match:
- lines.pop(0)
+ lines.pop()
if match.group(1) == 'ok':
test_suite.status = TestStatus.SUCCESS
else:
test_suite.status = TestStatus.FAILURE
+ skip_match = OK_NOT_OK_SKIP.match(line)
+ if skip_match:
+ test_suite.status = TestStatus.SKIPPED
suite_index = int(match.group(2))
if suite_index != expected_suite_index:
print_with_timestamp(
@@ -224,14 +275,14 @@ def parse_ok_not_ok_test_suite(lines: List[str],
else:
return False
-def bubble_up_errors(statuses: Iterable[TestStatus]) -> TestStatus:
- return reduce(max_status, statuses, TestStatus.SUCCESS)
+def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
+ return reduce(max_status, status_list, TestStatus.SKIPPED)
def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
return max_status(max_test_case_status, test_suite.status)
-def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
+def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
if not lines:
return None
consume_non_diagnostic(lines)
@@ -257,26 +308,26 @@ def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[Te
print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
return test_suite
else:
- print('failed to parse end of suite' + lines[0])
+ print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
return None
TAP_HEADER = re.compile(r'^TAP version 14$')
-def parse_tap_header(lines: List[str]) -> bool:
+def parse_tap_header(lines: LineStream) -> bool:
consume_non_diagnostic(lines)
- if TAP_HEADER.match(lines[0]):
- lines.pop(0)
+ if TAP_HEADER.match(lines.peek()):
+ lines.pop()
return True
else:
return False
TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
-def parse_test_plan(lines: List[str]) -> Optional[int]:
+def parse_test_plan(lines: LineStream) -> Optional[int]:
consume_non_diagnostic(lines)
- match = TEST_PLAN.match(lines[0])
+ match = TEST_PLAN.match(lines.peek())
if match:
- lines.pop(0)
+ lines.pop()
return int(match.group(1))
else:
return None
@@ -284,7 +335,7 @@ def parse_test_plan(lines: List[str]) -> Optional[int]:
def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
return bubble_up_errors(x.status for x in test_suites)
-def parse_test_result(lines: List[str]) -> TestResult:
+def parse_test_result(lines: LineStream) -> TestResult:
consume_non_diagnostic(lines)
if not lines or not parse_tap_header(lines):
return TestResult(TestStatus.NO_TESTS, [], lines)
@@ -311,49 +362,69 @@ def parse_test_result(lines: List[str]) -> TestResult:
else:
return TestResult(TestStatus.NO_TESTS, [], lines)
-def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
- total_tests = 0
- failed_tests = 0
- crashed_tests = 0
+class TestCounts:
+ passed: int
+ failed: int
+ crashed: int
+ skipped: int
+
+ def __init__(self):
+ self.passed = 0
+ self.failed = 0
+ self.crashed = 0
+ self.skipped = 0
+
+ def total(self) -> int:
+ return self.passed + self.failed + self.crashed + self.skipped
+
+def print_and_count_results(test_result: TestResult) -> TestCounts:
+ counts = TestCounts()
for test_suite in test_result.suites:
if test_suite.status == TestStatus.SUCCESS:
print_suite_divider(green('[PASSED] ') + test_suite.name)
+ elif test_suite.status == TestStatus.SKIPPED:
+ print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
elif test_suite.status == TestStatus.TEST_CRASHED:
print_suite_divider(red('[CRASHED] ' + test_suite.name))
else:
print_suite_divider(red('[FAILED] ') + test_suite.name)
for test_case in test_suite.cases:
- total_tests += 1
if test_case.status == TestStatus.SUCCESS:
+ counts.passed += 1
print_with_timestamp(green('[PASSED] ') + test_case.name)
+ elif test_case.status == TestStatus.SKIPPED:
+ counts.skipped += 1
+ print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
elif test_case.status == TestStatus.TEST_CRASHED:
- crashed_tests += 1
+ counts.crashed += 1
print_with_timestamp(red('[CRASHED] ' + test_case.name))
print_log(map(yellow, test_case.log))
print_with_timestamp('')
else:
- failed_tests += 1
+ counts.failed += 1
print_with_timestamp(red('[FAILED] ') + test_case.name)
print_log(map(yellow, test_case.log))
print_with_timestamp('')
- return total_tests, failed_tests, crashed_tests
+ return counts
-def parse_run_tests(kernel_output) -> TestResult:
- total_tests = 0
- failed_tests = 0
- crashed_tests = 0
- test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
+def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
+ counts = TestCounts()
+ lines = extract_tap_lines(kernel_output)
+ test_result = parse_test_result(lines)
if test_result.status == TestStatus.NO_TESTS:
print(red('[ERROR] ') + yellow('no tests run!'))
elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
print(red('[ERROR] ') + yellow('could not parse test results!'))
else:
- (total_tests,
- failed_tests,
- crashed_tests) = print_and_count_results(test_result)
+ counts = print_and_count_results(test_result)
print_with_timestamp(DIVIDER)
- fmt = green if test_result.status == TestStatus.SUCCESS else red
+ if test_result.status == TestStatus.SUCCESS:
+ fmt = green
+ elif test_result.status == TestStatus.SKIPPED:
+ fmt = yellow
+ else:
+ fmt =red
print_with_timestamp(
- fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
- (total_tests, failed_tests, crashed_tests)))
+ fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
+ (counts.total(), counts.failed, counts.crashed, counts.skipped)))
return test_result