0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 from __future__ import annotations
0013 import re
0014 import sys
0015
0016 from enum import Enum, auto
0017 from typing import Iterable, Iterator, List, Optional, Tuple
0018
0019 from kunit_printer import stdout
0020
0021 class Test:
0022 """
0023 A class to represent a test parsed from KTAP results. All KTAP
0024 results within a test log are stored in a main Test object as
0025 subtests.
0026
0027 Attributes:
0028 status : TestStatus - status of the test
0029 name : str - name of the test
0030 expected_count : int - expected number of subtests (0 if single
0031 test case and None if unknown expected number of subtests)
0032 subtests : List[Test] - list of subtests
0033 log : List[str] - log of KTAP lines that correspond to the test
0034 counts : TestCounts - counts of the test statuses and errors of
0035 subtests or of the test itself if the test is a single
0036 test case.
0037 """
0038 def __init__(self) -> None:
0039 """Creates Test object with default attributes."""
0040 self.status = TestStatus.TEST_CRASHED
0041 self.name = ''
0042 self.expected_count = 0
0043 self.subtests = []
0044 self.log = []
0045 self.counts = TestCounts()
0046
0047 def __str__(self) -> str:
0048 """Returns string representation of a Test class object."""
0049 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
0050 f'{self.subtests}, {self.log}, {self.counts})')
0051
0052 def __repr__(self) -> str:
0053 """Returns string representation of a Test class object."""
0054 return str(self)
0055
0056 def add_error(self, error_message: str) -> None:
0057 """Records an error that occurred while parsing this test."""
0058 self.counts.errors += 1
0059 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
0060
0061 class TestStatus(Enum):
0062 """An enumeration class to represent the status of a test."""
0063 SUCCESS = auto()
0064 FAILURE = auto()
0065 SKIPPED = auto()
0066 TEST_CRASHED = auto()
0067 NO_TESTS = auto()
0068 FAILURE_TO_PARSE_TESTS = auto()
0069
0070 class TestCounts:
0071 """
0072 Tracks the counts of statuses of all test cases and any errors within
0073 a Test.
0074
0075 Attributes:
0076 passed : int - the number of tests that have passed
0077 failed : int - the number of tests that have failed
0078 crashed : int - the number of tests that have crashed
0079 skipped : int - the number of tests that have skipped
0080 errors : int - the number of errors in the test and subtests
0081 """
0082 def __init__(self):
0083 """Creates TestCounts object with counts of all test
0084 statuses and test errors set to 0.
0085 """
0086 self.passed = 0
0087 self.failed = 0
0088 self.crashed = 0
0089 self.skipped = 0
0090 self.errors = 0
0091
0092 def __str__(self) -> str:
0093 """Returns the string representation of a TestCounts object."""
0094 statuses = [('passed', self.passed), ('failed', self.failed),
0095 ('crashed', self.crashed), ('skipped', self.skipped),
0096 ('errors', self.errors)]
0097 return f'Ran {self.total()} tests: ' + \
0098 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
0099
0100 def total(self) -> int:
0101 """Returns the total number of test cases within a test
0102 object, where a test case is a test with no subtests.
0103 """
0104 return (self.passed + self.failed + self.crashed +
0105 self.skipped)
0106
0107 def add_subtest_counts(self, counts: TestCounts) -> None:
0108 """
0109 Adds the counts of another TestCounts object to the current
0110 TestCounts object. Used to add the counts of a subtest to the
0111 parent test.
0112
0113 Parameters:
0114 counts - a different TestCounts object whose counts
0115 will be added to the counts of the TestCounts object
0116 """
0117 self.passed += counts.passed
0118 self.failed += counts.failed
0119 self.crashed += counts.crashed
0120 self.skipped += counts.skipped
0121 self.errors += counts.errors
0122
0123 def get_status(self) -> TestStatus:
0124 """Returns the aggregated status of a Test using test
0125 counts.
0126 """
0127 if self.total() == 0:
0128 return TestStatus.NO_TESTS
0129 if self.crashed:
0130
0131 return TestStatus.TEST_CRASHED
0132 if self.failed:
0133 return TestStatus.FAILURE
0134 if self.passed:
0135
0136 return TestStatus.SUCCESS
0137
0138 return TestStatus.SKIPPED
0139
0140 def add_status(self, status: TestStatus) -> None:
0141 """Increments the count for `status`."""
0142 if status == TestStatus.SUCCESS:
0143 self.passed += 1
0144 elif status == TestStatus.FAILURE:
0145 self.failed += 1
0146 elif status == TestStatus.SKIPPED:
0147 self.skipped += 1
0148 elif status != TestStatus.NO_TESTS:
0149 self.crashed += 1
0150
0151 class LineStream:
0152 """
0153 A class to represent the lines of kernel output.
0154 Provides a lazy peek()/pop() interface over an iterator of
0155 (line#, text).
0156 """
0157 _lines: Iterator[Tuple[int, str]]
0158 _next: Tuple[int, str]
0159 _need_next: bool
0160 _done: bool
0161
0162 def __init__(self, lines: Iterator[Tuple[int, str]]):
0163 """Creates a new LineStream that wraps the given iterator."""
0164 self._lines = lines
0165 self._done = False
0166 self._need_next = True
0167 self._next = (0, '')
0168
0169 def _get_next(self) -> None:
0170 """Advances the LineSteam to the next line, if necessary."""
0171 if not self._need_next:
0172 return
0173 try:
0174 self._next = next(self._lines)
0175 except StopIteration:
0176 self._done = True
0177 finally:
0178 self._need_next = False
0179
0180 def peek(self) -> str:
0181 """Returns the current line, without advancing the LineStream.
0182 """
0183 self._get_next()
0184 return self._next[1]
0185
0186 def pop(self) -> str:
0187 """Returns the current line and advances the LineStream to
0188 the next line.
0189 """
0190 s = self.peek()
0191 if self._done:
0192 raise ValueError(f'LineStream: going past EOF, last line was {s}')
0193 self._need_next = True
0194 return s
0195
0196 def __bool__(self) -> bool:
0197 """Returns True if stream has more lines."""
0198 self._get_next()
0199 return not self._done
0200
0201
0202 def __iter__(self) -> Iterator[str]:
0203 """Empties all lines stored in LineStream object into
0204 Iterator object and returns the Iterator object.
0205 """
0206 while bool(self):
0207 yield self.pop()
0208
0209 def line_number(self) -> int:
0210 """Returns the line number of the current line."""
0211 self._get_next()
0212 return self._next[0]
0213
0214
0215
0216 KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
0217 TAP_START = re.compile(r'TAP version ([0-9]+)$')
0218 KTAP_END = re.compile('(List of all partitions:|'
0219 'Kernel panic - not syncing: VFS:|reboot: System halted)')
0220
0221 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
0222 """Extracts KTAP lines from the kernel output."""
0223 def isolate_ktap_output(kernel_output: Iterable[str]) \
0224 -> Iterator[Tuple[int, str]]:
0225 line_num = 0
0226 started = False
0227 for line in kernel_output:
0228 line_num += 1
0229 line = line.rstrip()
0230 if not started and KTAP_START.search(line):
0231
0232
0233 prefix_len = len(
0234 line.split('KTAP version')[0])
0235 started = True
0236 yield line_num, line[prefix_len:]
0237 elif not started and TAP_START.search(line):
0238
0239
0240 prefix_len = len(line.split('TAP version')[0])
0241 started = True
0242 yield line_num, line[prefix_len:]
0243 elif started and KTAP_END.search(line):
0244
0245 break
0246 elif started:
0247
0248
0249 line = line[prefix_len:].lstrip()
0250 yield line_num, line
0251 return LineStream(lines=isolate_ktap_output(kernel_output))
0252
0253 KTAP_VERSIONS = [1]
0254 TAP_VERSIONS = [13, 14]
0255
0256 def check_version(version_num: int, accepted_versions: List[int],
0257 version_type: str, test: Test) -> None:
0258 """
0259 Adds error to test object if version number is too high or too
0260 low.
0261
0262 Parameters:
0263 version_num - The inputted version number from the parsed KTAP or TAP
0264 header line
0265 accepted_version - List of accepted KTAP or TAP versions
0266 version_type - 'KTAP' or 'TAP' depending on the type of
0267 version line.
0268 test - Test object for current test being parsed
0269 """
0270 if version_num < min(accepted_versions):
0271 test.add_error(f'{version_type} version lower than expected!')
0272 elif version_num > max(accepted_versions):
0273 test.add_error(f'{version_type} version higer than expected!')
0274
0275 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
0276 """
0277 Parses KTAP/TAP header line and checks version number.
0278 Returns False if fails to parse KTAP/TAP header line.
0279
0280 Accepted formats:
0281 - 'KTAP version [version number]'
0282 - 'TAP version [version number]'
0283
0284 Parameters:
0285 lines - LineStream of KTAP output to parse
0286 test - Test object for current test being parsed
0287
0288 Return:
0289 True if successfully parsed KTAP/TAP header line
0290 """
0291 ktap_match = KTAP_START.match(lines.peek())
0292 tap_match = TAP_START.match(lines.peek())
0293 if ktap_match:
0294 version_num = int(ktap_match.group(1))
0295 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
0296 elif tap_match:
0297 version_num = int(tap_match.group(1))
0298 check_version(version_num, TAP_VERSIONS, 'TAP', test)
0299 else:
0300 return False
0301 test.log.append(lines.pop())
0302 return True
0303
0304 TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
0305
0306 def parse_test_header(lines: LineStream, test: Test) -> bool:
0307 """
0308 Parses test header and stores test name in test object.
0309 Returns False if fails to parse test header line.
0310
0311 Accepted format:
0312 - '# Subtest: [test name]'
0313
0314 Parameters:
0315 lines - LineStream of KTAP output to parse
0316 test - Test object for current test being parsed
0317
0318 Return:
0319 True if successfully parsed test header line
0320 """
0321 match = TEST_HEADER.match(lines.peek())
0322 if not match:
0323 return False
0324 test.log.append(lines.pop())
0325 test.name = match.group(1)
0326 return True
0327
0328 TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
0329
0330 def parse_test_plan(lines: LineStream, test: Test) -> bool:
0331 """
0332 Parses test plan line and stores the expected number of subtests in
0333 test object. Reports an error if expected count is 0.
0334 Returns False and sets expected_count to None if there is no valid test
0335 plan.
0336
0337 Accepted format:
0338 - '1..[number of subtests]'
0339
0340 Parameters:
0341 lines - LineStream of KTAP output to parse
0342 test - Test object for current test being parsed
0343
0344 Return:
0345 True if successfully parsed test plan line
0346 """
0347 match = TEST_PLAN.match(lines.peek())
0348 if not match:
0349 test.expected_count = None
0350 return False
0351 test.log.append(lines.pop())
0352 expected_count = int(match.group(1))
0353 test.expected_count = expected_count
0354 return True
0355
0356 TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
0357
0358 TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
0359
0360 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
0361 """
0362 Matches current line with the format of a test result line and checks
0363 if the name matches the name of the current test.
0364 Returns False if fails to match format or name.
0365
0366 Accepted format:
0367 - '[ok|not ok] [test number] [-] [test name] [optional skip
0368 directive]'
0369
0370 Parameters:
0371 lines - LineStream of KTAP output to parse
0372 test - Test object for current test being parsed
0373
0374 Return:
0375 True if matched a test result line and the name matching the
0376 expected test name
0377 """
0378 line = lines.peek()
0379 match = TEST_RESULT.match(line)
0380 if not match:
0381 return False
0382 name = match.group(4)
0383 return name == test.name
0384
0385 def parse_test_result(lines: LineStream, test: Test,
0386 expected_num: int) -> bool:
0387 """
0388 Parses test result line and stores the status and name in the test
0389 object. Reports an error if the test number does not match expected
0390 test number.
0391 Returns False if fails to parse test result line.
0392
0393 Note that the SKIP directive is the only direction that causes a
0394 change in status.
0395
0396 Accepted format:
0397 - '[ok|not ok] [test number] [-] [test name] [optional skip
0398 directive]'
0399
0400 Parameters:
0401 lines - LineStream of KTAP output to parse
0402 test - Test object for current test being parsed
0403 expected_num - expected test number for current test
0404
0405 Return:
0406 True if successfully parsed a test result line.
0407 """
0408 line = lines.peek()
0409 match = TEST_RESULT.match(line)
0410 skip_match = TEST_RESULT_SKIP.match(line)
0411
0412
0413 if not match:
0414 return False
0415 test.log.append(lines.pop())
0416
0417
0418 if skip_match:
0419 test.name = skip_match.group(4)
0420 else:
0421 test.name = match.group(4)
0422
0423
0424 num = int(match.group(2))
0425 if num != expected_num:
0426 test.add_error(f'Expected test number {expected_num} but found {num}')
0427
0428
0429 status = match.group(1)
0430 if skip_match:
0431 test.status = TestStatus.SKIPPED
0432 elif status == 'ok':
0433 test.status = TestStatus.SUCCESS
0434 else:
0435 test.status = TestStatus.FAILURE
0436 return True
0437
0438 def parse_diagnostic(lines: LineStream) -> List[str]:
0439 """
0440 Parse lines that do not match the format of a test result line or
0441 test header line and returns them in list.
0442
0443 Line formats that are not parsed:
0444 - '# Subtest: [test name]'
0445 - '[ok|not ok] [test number] [-] [test name] [optional skip
0446 directive]'
0447
0448 Parameters:
0449 lines - LineStream of KTAP output to parse
0450
0451 Return:
0452 Log of diagnostic lines
0453 """
0454 log = []
0455 while lines and not TEST_RESULT.match(lines.peek()) and not \
0456 TEST_HEADER.match(lines.peek()):
0457 log.append(lines.pop())
0458 return log
0459
0460
0461
0462
0463 DIVIDER = '=' * 60
0464
0465 def format_test_divider(message: str, len_message: int) -> str:
0466 """
0467 Returns string with message centered in fixed width divider.
0468
0469 Example:
0470 '===================== message example ====================='
0471
0472 Parameters:
0473 message - message to be centered in divider line
0474 len_message - length of the message to be printed such that
0475 any characters of the color codes are not counted
0476
0477 Return:
0478 String containing message centered in fixed width divider
0479 """
0480 default_count = 3
0481 len_1 = default_count
0482 len_2 = default_count
0483 difference = len(DIVIDER) - len_message - 2
0484 if difference > 0:
0485
0486 len_1 = int(difference / 2)
0487 len_2 = difference - len_1
0488 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
0489
0490 def print_test_header(test: Test) -> None:
0491 """
0492 Prints test header with test name and optionally the expected number
0493 of subtests.
0494
0495 Example:
0496 '=================== example (2 subtests) ==================='
0497
0498 Parameters:
0499 test - Test object representing current test being printed
0500 """
0501 message = test.name
0502 if test.expected_count:
0503 if test.expected_count == 1:
0504 message += ' (1 subtest)'
0505 else:
0506 message += f' ({test.expected_count} subtests)'
0507 stdout.print_with_timestamp(format_test_divider(message, len(message)))
0508
0509 def print_log(log: Iterable[str]) -> None:
0510 """Prints all strings in saved log for test in yellow."""
0511 for m in log:
0512 stdout.print_with_timestamp(stdout.yellow(m))
0513
0514 def format_test_result(test: Test) -> str:
0515 """
0516 Returns string with formatted test result with colored status and test
0517 name.
0518
0519 Example:
0520 '[PASSED] example'
0521
0522 Parameters:
0523 test - Test object representing current test being printed
0524
0525 Return:
0526 String containing formatted test result
0527 """
0528 if test.status == TestStatus.SUCCESS:
0529 return stdout.green('[PASSED] ') + test.name
0530 if test.status == TestStatus.SKIPPED:
0531 return stdout.yellow('[SKIPPED] ') + test.name
0532 if test.status == TestStatus.NO_TESTS:
0533 return stdout.yellow('[NO TESTS RUN] ') + test.name
0534 if test.status == TestStatus.TEST_CRASHED:
0535 print_log(test.log)
0536 return stdout.red('[CRASHED] ') + test.name
0537 print_log(test.log)
0538 return stdout.red('[FAILED] ') + test.name
0539
0540 def print_test_result(test: Test) -> None:
0541 """
0542 Prints result line with status of test.
0543
0544 Example:
0545 '[PASSED] example'
0546
0547 Parameters:
0548 test - Test object representing current test being printed
0549 """
0550 stdout.print_with_timestamp(format_test_result(test))
0551
0552 def print_test_footer(test: Test) -> None:
0553 """
0554 Prints test footer with status of test.
0555
0556 Example:
0557 '===================== [PASSED] example ====================='
0558
0559 Parameters:
0560 test - Test object representing current test being printed
0561 """
0562 message = format_test_result(test)
0563 stdout.print_with_timestamp(format_test_divider(message,
0564 len(message) - stdout.color_len()))
0565
0566 def print_summary_line(test: Test) -> None:
0567 """
0568 Prints summary line of test object. Color of line is dependent on
0569 status of test. Color is green if test passes, yellow if test is
0570 skipped, and red if the test fails or crashes. Summary line contains
0571 counts of the statuses of the tests subtests or the test itself if it
0572 has no subtests.
0573
0574 Example:
0575 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
0576 Errors: 0"
0577
0578 test - Test object representing current test being printed
0579 """
0580 if test.status == TestStatus.SUCCESS:
0581 color = stdout.green
0582 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
0583 color = stdout.yellow
0584 else:
0585 color = stdout.red
0586 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
0587
0588
0589
0590 def bubble_up_test_results(test: Test) -> None:
0591 """
0592 If the test has subtests, add the test counts of the subtests to the
0593 test and check if any of the tests crashed and if so set the test
0594 status to crashed. Otherwise if the test has no subtests add the
0595 status of the test to the test counts.
0596
0597 Parameters:
0598 test - Test object for current test being parsed
0599 """
0600 subtests = test.subtests
0601 counts = test.counts
0602 status = test.status
0603 for t in subtests:
0604 counts.add_subtest_counts(t.counts)
0605 if counts.total() == 0:
0606 counts.add_status(status)
0607 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
0608 test.status = TestStatus.TEST_CRASHED
0609
0610 def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
0611 """
0612 Finds next test to parse in LineStream, creates new Test object,
0613 parses any subtests of the test, populates Test object with all
0614 information (status, name) about the test and the Test objects for
0615 any subtests, and then returns the Test object. The method accepts
0616 three formats of tests:
0617
0618 Accepted test formats:
0619
0620 - Main KTAP/TAP header
0621
0622 Example:
0623
0624 KTAP version 1
0625 1..4
0626 [subtests]
0627
0628 - Subtest header line
0629
0630 Example:
0631
0632 # Subtest: name
0633 1..3
0634 [subtests]
0635 ok 1 name
0636
0637 - Test result line
0638
0639 Example:
0640
0641 ok 1 - test
0642
0643 Parameters:
0644 lines - LineStream of KTAP output to parse
0645 expected_num - expected test number for test to be parsed
0646 log - list of strings containing any preceding diagnostic lines
0647 corresponding to the current test
0648
0649 Return:
0650 Test object populated with characteristics and any subtests
0651 """
0652 test = Test()
0653 test.log.extend(log)
0654 parent_test = False
0655 main = parse_ktap_header(lines, test)
0656 if main:
0657
0658
0659 test.name = "main"
0660 parse_test_plan(lines, test)
0661 parent_test = True
0662 else:
0663
0664
0665
0666 parent_test = parse_test_header(lines, test)
0667 if parent_test:
0668
0669
0670 parse_test_plan(lines, test)
0671 print_test_header(test)
0672 expected_count = test.expected_count
0673 subtests = []
0674 test_num = 1
0675 while parent_test and (expected_count is None or test_num <= expected_count):
0676
0677
0678
0679
0680
0681 sub_log = parse_diagnostic(lines)
0682 sub_test = Test()
0683 if not lines or (peek_test_name_match(lines, test) and
0684 not main):
0685 if expected_count and test_num <= expected_count:
0686
0687
0688
0689 test.add_error('missing expected subtest!')
0690 sub_test.log.extend(sub_log)
0691 test.counts.add_status(
0692 TestStatus.TEST_CRASHED)
0693 print_test_result(sub_test)
0694 else:
0695 test.log.extend(sub_log)
0696 break
0697 else:
0698 sub_test = parse_test(lines, test_num, sub_log)
0699 subtests.append(sub_test)
0700 test_num += 1
0701 test.subtests = subtests
0702 if not main:
0703
0704 test.log.extend(parse_diagnostic(lines))
0705 if (parent_test and peek_test_name_match(lines, test)) or \
0706 not parent_test:
0707 parse_test_result(lines, test, expected_num)
0708 else:
0709 test.add_error('missing subtest result line!')
0710
0711
0712 if parent_test and len(subtests) == 0:
0713
0714
0715 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
0716 test.status = TestStatus.NO_TESTS
0717 test.add_error('0 tests run!')
0718
0719
0720 bubble_up_test_results(test)
0721 if parent_test and not main:
0722
0723
0724 print_test_footer(test)
0725 elif not main:
0726 print_test_result(test)
0727 return test
0728
0729 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
0730 """
0731 Using kernel output, extract KTAP lines, parse the lines for test
0732 results and print condensed test results and summary line.
0733
0734 Parameters:
0735 kernel_output - Iterable object contains lines of kernel output
0736
0737 Return:
0738 Test - the main test object with all subtests.
0739 """
0740 stdout.print_with_timestamp(DIVIDER)
0741 lines = extract_tap_lines(kernel_output)
0742 test = Test()
0743 if not lines:
0744 test.name = '<missing>'
0745 test.add_error('could not find any KTAP output!')
0746 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
0747 else:
0748 test = parse_test(lines, 0, [])
0749 if test.status != TestStatus.NO_TESTS:
0750 test.status = test.counts.get_status()
0751 stdout.print_with_timestamp(DIVIDER)
0752 print_summary_line(test)
0753 return test