0001
0002
0003
0004 """
0005 tdc.py - Linux tc (Traffic Control) unit test driver
0006
0007 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
0008 """
0009
0010 import re
0011 import os
0012 import sys
0013 import argparse
0014 import importlib
0015 import json
0016 import subprocess
0017 import time
0018 import traceback
0019 from collections import OrderedDict
0020 from string import Template
0021
0022 from tdc_config import *
0023 from tdc_helper import *
0024
0025 import TdcPlugin
0026 from TdcResults import *
0027
0028 class PluginDependencyException(Exception):
0029 def __init__(self, missing_pg):
0030 self.missing_pg = missing_pg
0031
0032 class PluginMgrTestFail(Exception):
0033 def __init__(self, stage, output, message):
0034 self.stage = stage
0035 self.output = output
0036 self.message = message
0037
0038 class PluginMgr:
0039 def __init__(self, argparser):
0040 super().__init__()
0041 self.plugins = {}
0042 self.plugin_instances = []
0043 self.failed_plugins = {}
0044 self.argparser = argparser
0045
0046
0047 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
0048 for dirpath, dirnames, filenames in os.walk(plugindir):
0049 for fn in filenames:
0050 if (fn.endswith('.py') and
0051 not fn == '__init__.py' and
0052 not fn.startswith('#') and
0053 not fn.startswith('.#')):
0054 mn = fn[0:-3]
0055 foo = importlib.import_module('plugins.' + mn)
0056 self.plugins[mn] = foo
0057 self.plugin_instances.append(foo.SubPlugin())
0058
0059 def load_plugin(self, pgdir, pgname):
0060 pgname = pgname[0:-3]
0061 foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
0062 self.plugins[pgname] = foo
0063 self.plugin_instances.append(foo.SubPlugin())
0064 self.plugin_instances[-1].check_args(self.args, None)
0065
0066 def get_required_plugins(self, testlist):
0067 '''
0068 Get all required plugins from the list of test cases and return
0069 all unique items.
0070 '''
0071 reqs = []
0072 for t in testlist:
0073 try:
0074 if 'requires' in t['plugins']:
0075 if isinstance(t['plugins']['requires'], list):
0076 reqs.extend(t['plugins']['requires'])
0077 else:
0078 reqs.append(t['plugins']['requires'])
0079 except KeyError:
0080 continue
0081 reqs = get_unique_item(reqs)
0082 return reqs
0083
0084 def load_required_plugins(self, reqs, parser, args, remaining):
0085 '''
0086 Get all required plugins from the list of test cases and load any plugin
0087 that is not already enabled.
0088 '''
0089 pgd = ['plugin-lib', 'plugin-lib-custom']
0090 pnf = []
0091
0092 for r in reqs:
0093 if r not in self.plugins:
0094 fname = '{}.py'.format(r)
0095 source_path = []
0096 for d in pgd:
0097 pgpath = '{}/{}'.format(d, fname)
0098 if os.path.isfile(pgpath):
0099 source_path.append(pgpath)
0100 if len(source_path) == 0:
0101 print('ERROR: unable to find required plugin {}'.format(r))
0102 pnf.append(fname)
0103 continue
0104 elif len(source_path) > 1:
0105 print('WARNING: multiple copies of plugin {} found, using version found')
0106 print('at {}'.format(source_path[0]))
0107 pgdir = source_path[0]
0108 pgdir = pgdir.split('/')[0]
0109 self.load_plugin(pgdir, fname)
0110 if len(pnf) > 0:
0111 raise PluginDependencyException(pnf)
0112
0113 parser = self.call_add_args(parser)
0114 (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
0115 return args
0116
0117 def call_pre_suite(self, testcount, testidlist):
0118 for pgn_inst in self.plugin_instances:
0119 pgn_inst.pre_suite(testcount, testidlist)
0120
0121 def call_post_suite(self, index):
0122 for pgn_inst in reversed(self.plugin_instances):
0123 pgn_inst.post_suite(index)
0124
0125 def call_pre_case(self, caseinfo, *, test_skip=False):
0126 for pgn_inst in self.plugin_instances:
0127 try:
0128 pgn_inst.pre_case(caseinfo, test_skip)
0129 except Exception as ee:
0130 print('exception {} in call to pre_case for {} plugin'.
0131 format(ee, pgn_inst.__class__))
0132 print('test_ordinal is {}'.format(test_ordinal))
0133 print('testid is {}'.format(caseinfo['id']))
0134 raise
0135
0136 def call_post_case(self):
0137 for pgn_inst in reversed(self.plugin_instances):
0138 pgn_inst.post_case()
0139
0140 def call_pre_execute(self):
0141 for pgn_inst in self.plugin_instances:
0142 pgn_inst.pre_execute()
0143
0144 def call_post_execute(self):
0145 for pgn_inst in reversed(self.plugin_instances):
0146 pgn_inst.post_execute()
0147
0148 def call_add_args(self, parser):
0149 for pgn_inst in self.plugin_instances:
0150 parser = pgn_inst.add_args(parser)
0151 return parser
0152
0153 def call_check_args(self, args, remaining):
0154 for pgn_inst in self.plugin_instances:
0155 pgn_inst.check_args(args, remaining)
0156
0157 def call_adjust_command(self, stage, command):
0158 for pgn_inst in self.plugin_instances:
0159 command = pgn_inst.adjust_command(stage, command)
0160 return command
0161
0162 def set_args(self, args):
0163 self.args = args
0164
0165 @staticmethod
0166 def _make_argparser(args):
0167 self.argparser = argparse.ArgumentParser(
0168 description='Linux TC unit tests')
0169
0170 def replace_keywords(cmd):
0171 """
0172 For a given executable command, substitute any known
0173 variables contained within NAMES with the correct values
0174 """
0175 tcmd = Template(cmd)
0176 subcmd = tcmd.safe_substitute(NAMES)
0177 return subcmd
0178
0179
0180 def exec_cmd(args, pm, stage, command):
0181 """
0182 Perform any required modifications on an executable command, then run
0183 it in a subprocess and return the results.
0184 """
0185 if len(command.strip()) == 0:
0186 return None, None
0187 if '$' in command:
0188 command = replace_keywords(command)
0189
0190 command = pm.call_adjust_command(stage, command)
0191 if args.verbose > 0:
0192 print('command "{}"'.format(command))
0193 proc = subprocess.Popen(command,
0194 shell=True,
0195 stdout=subprocess.PIPE,
0196 stderr=subprocess.PIPE,
0197 env=ENVIR)
0198
0199 try:
0200 (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
0201 if proc.returncode != 0 and len(serr) > 0:
0202 foutput = serr.decode("utf-8", errors="ignore")
0203 else:
0204 foutput = rawout.decode("utf-8", errors="ignore")
0205 except subprocess.TimeoutExpired:
0206 foutput = "Command \"{}\" timed out\n".format(command)
0207 proc.returncode = 255
0208
0209 proc.stdout.close()
0210 proc.stderr.close()
0211 return proc, foutput
0212
0213
0214 def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
0215 """
0216 Execute the setup/teardown commands for a test case.
0217 Optionally terminate test execution if the command fails.
0218 """
0219 if args.verbose > 0:
0220 print('{}'.format(prefix))
0221 for cmdinfo in cmdlist:
0222 if isinstance(cmdinfo, list):
0223 exit_codes = cmdinfo[1:]
0224 cmd = cmdinfo[0]
0225 else:
0226 exit_codes = [0]
0227 cmd = cmdinfo
0228
0229 if not cmd:
0230 continue
0231
0232 (proc, foutput) = exec_cmd(args, pm, stage, cmd)
0233
0234 if proc and (proc.returncode not in exit_codes):
0235 print('', file=sys.stderr)
0236 print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
0237 file=sys.stderr)
0238 print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
0239 file=sys.stderr)
0240 print("returncode {}; expected {}".format(proc.returncode,
0241 exit_codes))
0242 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
0243 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
0244 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
0245 raise PluginMgrTestFail(
0246 stage, output,
0247 '"{}" did not complete successfully'.format(prefix))
0248
0249 def run_one_test(pm, args, index, tidx):
0250 global NAMES
0251 result = True
0252 tresult = ""
0253 tap = ""
0254 res = TestResult(tidx['id'], tidx['name'])
0255 if args.verbose > 0:
0256 print("\t====================\n=====> ", end="")
0257 print("Test " + tidx["id"] + ": " + tidx["name"])
0258
0259 if 'skip' in tidx:
0260 if tidx['skip'] == 'yes':
0261 res = TestResult(tidx['id'], tidx['name'])
0262 res.set_result(ResultState.skip)
0263 res.set_errormsg('Test case designated as skipped.')
0264 pm.call_pre_case(tidx, test_skip=True)
0265 pm.call_post_execute()
0266 return res
0267
0268
0269 NAMES['TESTID'] = tidx['id']
0270
0271 pm.call_pre_case(tidx)
0272 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
0273
0274 if (args.verbose > 0):
0275 print('-----> execute stage')
0276 pm.call_pre_execute()
0277 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
0278 if p:
0279 exit_code = p.returncode
0280 else:
0281 exit_code = None
0282
0283 pm.call_post_execute()
0284
0285 if (exit_code is None or exit_code != int(tidx["expExitCode"])):
0286 print("exit: {!r}".format(exit_code))
0287 print("exit: {}".format(int(tidx["expExitCode"])))
0288
0289 res.set_result(ResultState.fail)
0290 res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
0291 print(procout)
0292 else:
0293 if args.verbose > 0:
0294 print('-----> verify stage')
0295 match_pattern = re.compile(
0296 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
0297 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
0298 if procout:
0299 match_index = re.findall(match_pattern, procout)
0300 if len(match_index) != int(tidx["matchCount"]):
0301 res.set_result(ResultState.fail)
0302 res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
0303 else:
0304 res.set_result(ResultState.success)
0305 elif int(tidx["matchCount"]) != 0:
0306 res.set_result(ResultState.fail)
0307 res.set_failmsg('No output generated by verify command.')
0308 else:
0309 res.set_result(ResultState.success)
0310
0311 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
0312 pm.call_post_case()
0313
0314 index += 1
0315
0316
0317 del(NAMES['TESTID'])
0318 return res
0319
0320 def test_runner(pm, args, filtered_tests):
0321 """
0322 Driver function for the unit tests.
0323
0324 Prints information about the tests being run, executes the setup and
0325 teardown commands and the command under test itself. Also determines
0326 success/failure based on the information in the test case and generates
0327 TAP output accordingly.
0328 """
0329 testlist = filtered_tests
0330 tcount = len(testlist)
0331 index = 1
0332 tap = ''
0333 badtest = None
0334 stage = None
0335 emergency_exit = False
0336 emergency_exit_message = ''
0337
0338 tsr = TestSuiteReport()
0339
0340 try:
0341 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
0342 except Exception as ee:
0343 ex_type, ex, ex_tb = sys.exc_info()
0344 print('Exception {} {} (caught in pre_suite).'.
0345 format(ex_type, ex))
0346 traceback.print_tb(ex_tb)
0347 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
0348 emergency_exit = True
0349 stage = 'pre-SUITE'
0350
0351 if emergency_exit:
0352 pm.call_post_suite(index)
0353 return emergency_exit_message
0354 if args.verbose > 1:
0355 print('give test rig 2 seconds to stabilize')
0356 time.sleep(2)
0357 for tidx in testlist:
0358 if "flower" in tidx["category"] and args.device == None:
0359 errmsg = "Tests using the DEV2 variable must define the name of a "
0360 errmsg += "physical NIC with the -d option when running tdc.\n"
0361 errmsg += "Test has been skipped."
0362 if args.verbose > 1:
0363 print(errmsg)
0364 res = TestResult(tidx['id'], tidx['name'])
0365 res.set_result(ResultState.skip)
0366 res.set_errormsg(errmsg)
0367 tsr.add_resultdata(res)
0368 continue
0369 try:
0370 badtest = tidx
0371 res = run_one_test(pm, args, index, tidx)
0372 tsr.add_resultdata(res)
0373 except PluginMgrTestFail as pmtf:
0374 ex_type, ex, ex_tb = sys.exc_info()
0375 stage = pmtf.stage
0376 message = pmtf.message
0377 output = pmtf.output
0378 res = TestResult(tidx['id'], tidx['name'])
0379 res.set_result(ResultState.skip)
0380 res.set_errormsg(pmtf.message)
0381 res.set_failmsg(pmtf.output)
0382 tsr.add_resultdata(res)
0383 index += 1
0384 print(message)
0385 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
0386 format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
0387 print('---------------')
0388 print('traceback')
0389 traceback.print_tb(ex_tb)
0390 print('---------------')
0391 if stage == 'teardown':
0392 print('accumulated output for this test:')
0393 if pmtf.output:
0394 print(pmtf.output)
0395 print('---------------')
0396 break
0397 index += 1
0398
0399
0400
0401 count = index
0402
0403 if tcount + 1 != count:
0404 for tidx in testlist[count - 1:]:
0405 res = TestResult(tidx['id'], tidx['name'])
0406 res.set_result(ResultState.skip)
0407 msg = 'skipped - previous {} failed {} {}'.format(stage,
0408 index, badtest.get('id', '--Unknown--'))
0409 res.set_errormsg(msg)
0410 tsr.add_resultdata(res)
0411 count += 1
0412
0413 if args.pause:
0414 print('Want to pause\nPress enter to continue ...')
0415 if input(sys.stdin):
0416 print('got something on stdin')
0417
0418 pm.call_post_suite(index)
0419
0420 return tsr
0421
0422 def has_blank_ids(idlist):
0423 """
0424 Search the list for empty ID fields and return true/false accordingly.
0425 """
0426 return not(all(k for k in idlist))
0427
0428
0429 def load_from_file(filename):
0430 """
0431 Open the JSON file containing the test cases and return them
0432 as list of ordered dictionary objects.
0433 """
0434 try:
0435 with open(filename) as test_data:
0436 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
0437 except json.JSONDecodeError as jde:
0438 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde))
0439 testlist = list()
0440 else:
0441 idlist = get_id_list(testlist)
0442 if (has_blank_ids(idlist)):
0443 for k in testlist:
0444 k['filename'] = filename
0445 return testlist
0446
0447
0448 def args_parse():
0449 """
0450 Create the argument parser.
0451 """
0452 parser = argparse.ArgumentParser(description='Linux TC unit tests')
0453 return parser
0454
0455
0456 def set_args(parser):
0457 """
0458 Set the command line arguments for tdc.
0459 """
0460 parser.add_argument(
0461 '--outfile', type=str,
0462 help='Path to the file in which results should be saved. ' +
0463 'Default target is the current directory.')
0464 parser.add_argument(
0465 '-p', '--path', type=str,
0466 help='The full path to the tc executable to use')
0467 sg = parser.add_argument_group(
0468 'selection', 'select which test cases: ' +
0469 'files plus directories; filtered by categories plus testids')
0470 ag = parser.add_argument_group(
0471 'action', 'select action to perform on selected test cases')
0472
0473 sg.add_argument(
0474 '-D', '--directory', nargs='+', metavar='DIR',
0475 help='Collect tests from the specified directory(ies) ' +
0476 '(default [tc-tests])')
0477 sg.add_argument(
0478 '-f', '--file', nargs='+', metavar='FILE',
0479 help='Run tests from the specified file(s)')
0480 sg.add_argument(
0481 '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
0482 help='Run tests only from the specified category/ies, ' +
0483 'or if no category/ies is/are specified, list known categories.')
0484 sg.add_argument(
0485 '-e', '--execute', nargs='+', metavar='ID',
0486 help='Execute the specified test cases with specified IDs')
0487 ag.add_argument(
0488 '-l', '--list', action='store_true',
0489 help='List all test cases, or those only within the specified category')
0490 ag.add_argument(
0491 '-s', '--show', action='store_true', dest='showID',
0492 help='Display the selected test cases')
0493 ag.add_argument(
0494 '-i', '--id', action='store_true', dest='gen_id',
0495 help='Generate ID numbers for new test cases')
0496 parser.add_argument(
0497 '-v', '--verbose', action='count', default=0,
0498 help='Show the commands that are being run')
0499 parser.add_argument(
0500 '--format', default='tap', const='tap', nargs='?',
0501 choices=['none', 'xunit', 'tap'],
0502 help='Specify the format for test results. (Default: TAP)')
0503 parser.add_argument('-d', '--device',
0504 help='Execute test cases that use a physical device, ' +
0505 'where DEVICE is its name. (If not defined, tests ' +
0506 'that require a physical device will be skipped)')
0507 parser.add_argument(
0508 '-P', '--pause', action='store_true',
0509 help='Pause execution just before post-suite stage')
0510 return parser
0511
0512
0513 def check_default_settings(args, remaining, pm):
0514 """
0515 Process any arguments overriding the default settings,
0516 and ensure the settings are correct.
0517 """
0518
0519 global NAMES
0520
0521 if args.path != None:
0522 NAMES['TC'] = args.path
0523 if args.device != None:
0524 NAMES['DEV2'] = args.device
0525 if 'TIMEOUT' not in NAMES:
0526 NAMES['TIMEOUT'] = None
0527 if not os.path.isfile(NAMES['TC']):
0528 print("The specified tc path " + NAMES['TC'] + " does not exist.")
0529 exit(1)
0530
0531 pm.call_check_args(args, remaining)
0532
0533
0534 def get_id_list(alltests):
0535 """
0536 Generate a list of all IDs in the test cases.
0537 """
0538 return [x["id"] for x in alltests]
0539
0540
0541 def check_case_id(alltests):
0542 """
0543 Check for duplicate test case IDs.
0544 """
0545 idl = get_id_list(alltests)
0546 return [x for x in idl if idl.count(x) > 1]
0547
0548
0549 def does_id_exist(alltests, newid):
0550 """
0551 Check if a given ID already exists in the list of test cases.
0552 """
0553 idl = get_id_list(alltests)
0554 return (any(newid == x for x in idl))
0555
0556
0557 def generate_case_ids(alltests):
0558 """
0559 If a test case has a blank ID field, generate a random hex ID for it
0560 and then write the test cases back to disk.
0561 """
0562 import random
0563 for c in alltests:
0564 if (c["id"] == ""):
0565 while True:
0566 newid = str('{:04x}'.format(random.randrange(16**4)))
0567 if (does_id_exist(alltests, newid)):
0568 continue
0569 else:
0570 c['id'] = newid
0571 break
0572
0573 ufilename = []
0574 for c in alltests:
0575 if ('filename' in c):
0576 ufilename.append(c['filename'])
0577 ufilename = get_unique_item(ufilename)
0578 for f in ufilename:
0579 testlist = []
0580 for t in alltests:
0581 if 'filename' in t:
0582 if t['filename'] == f:
0583 del t['filename']
0584 testlist.append(t)
0585 outfile = open(f, "w")
0586 json.dump(testlist, outfile, indent=4)
0587 outfile.write("\n")
0588 outfile.close()
0589
0590 def filter_tests_by_id(args, testlist):
0591 '''
0592 Remove tests from testlist that are not in the named id list.
0593 If id list is empty, return empty list.
0594 '''
0595 newlist = list()
0596 if testlist and args.execute:
0597 target_ids = args.execute
0598
0599 if isinstance(target_ids, list) and (len(target_ids) > 0):
0600 newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
0601 return newlist
0602
0603 def filter_tests_by_category(args, testlist):
0604 '''
0605 Remove tests from testlist that are not in a named category.
0606 '''
0607 answer = list()
0608 if args.category and testlist:
0609 test_ids = list()
0610 for catg in set(args.category):
0611 if catg == '+c':
0612 continue
0613 print('considering category {}'.format(catg))
0614 for tc in testlist:
0615 if catg in tc['category'] and tc['id'] not in test_ids:
0616 answer.append(tc)
0617 test_ids.append(tc['id'])
0618
0619 return answer
0620
0621
0622 def get_test_cases(args):
0623 """
0624 If a test case file is specified, retrieve tests from that file.
0625 Otherwise, glob for all json files in subdirectories and load from
0626 each one.
0627 Also, if requested, filter by category, and add tests matching
0628 certain ids.
0629 """
0630 import fnmatch
0631
0632 flist = []
0633 testdirs = ['tc-tests']
0634
0635 if args.file:
0636
0637 testdirs = []
0638
0639 for ff in args.file:
0640 if not os.path.isfile(ff):
0641 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
0642 else:
0643 flist.append(os.path.abspath(ff))
0644
0645 if args.directory:
0646 testdirs = args.directory
0647
0648 for testdir in testdirs:
0649 for root, dirnames, filenames in os.walk(testdir):
0650 for filename in fnmatch.filter(filenames, '*.json'):
0651 candidate = os.path.abspath(os.path.join(root, filename))
0652 if candidate not in testdirs:
0653 flist.append(candidate)
0654
0655 alltestcases = list()
0656 for casefile in flist:
0657 alltestcases = alltestcases + (load_from_file(casefile))
0658
0659 allcatlist = get_test_categories(alltestcases)
0660 allidlist = get_id_list(alltestcases)
0661
0662 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
0663 idtestcases = filter_tests_by_id(args, alltestcases)
0664 cattestcases = filter_tests_by_category(args, alltestcases)
0665
0666 cat_ids = [x['id'] for x in cattestcases]
0667 if args.execute:
0668 if args.category:
0669 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
0670 else:
0671 alltestcases = idtestcases
0672 else:
0673 if cat_ids:
0674 alltestcases = cattestcases
0675 else:
0676
0677
0678 pass
0679
0680 return allcatlist, allidlist, testcases_by_cats, alltestcases
0681
0682
0683 def set_operation_mode(pm, parser, args, remaining):
0684 """
0685 Load the test case data and process remaining arguments to determine
0686 what the script should do for this run, and call the appropriate
0687 function.
0688 """
0689 ucat, idlist, testcases, alltests = get_test_cases(args)
0690
0691 if args.gen_id:
0692 if (has_blank_ids(idlist)):
0693 alltests = generate_case_ids(alltests)
0694 else:
0695 print("No empty ID fields found in test files.")
0696 exit(0)
0697
0698 duplicate_ids = check_case_id(alltests)
0699 if (len(duplicate_ids) > 0):
0700 print("The following test case IDs are not unique:")
0701 print(str(set(duplicate_ids)))
0702 print("Please correct them before continuing.")
0703 exit(1)
0704
0705 if args.showID:
0706 for atest in alltests:
0707 print_test_case(atest)
0708 exit(0)
0709
0710 if isinstance(args.category, list) and (len(args.category) == 0):
0711 print("Available categories:")
0712 print_sll(ucat)
0713 exit(0)
0714
0715 if args.list:
0716 list_test_cases(alltests)
0717 exit(0)
0718
0719 exit_code = 0
0720 if len(alltests):
0721 req_plugins = pm.get_required_plugins(alltests)
0722 try:
0723 args = pm.load_required_plugins(req_plugins, parser, args, remaining)
0724 except PluginDependencyException as pde:
0725 print('The following plugins were not found:')
0726 print('{}'.format(pde.missing_pg))
0727 catresults = test_runner(pm, args, alltests)
0728 if catresults.count_failures() != 0:
0729 exit_code = 1
0730 if args.format == 'none':
0731 print('Test results output suppression requested\n')
0732 else:
0733 print('\nAll test results: \n')
0734 if args.format == 'xunit':
0735 suffix = 'xml'
0736 res = catresults.format_xunit()
0737 elif args.format == 'tap':
0738 suffix = 'tap'
0739 res = catresults.format_tap()
0740 print(res)
0741 print('\n\n')
0742 if not args.outfile:
0743 fname = 'test-results.{}'.format(suffix)
0744 else:
0745 fname = args.outfile
0746 with open(fname, 'w') as fh:
0747 fh.write(res)
0748 fh.close()
0749 if os.getenv('SUDO_UID') is not None:
0750 os.chown(fname, uid=int(os.getenv('SUDO_UID')),
0751 gid=int(os.getenv('SUDO_GID')))
0752 else:
0753 print('No tests found\n')
0754 exit_code = 4
0755 exit(exit_code)
0756
0757 def main():
0758 """
0759 Start of execution; set up argument parser and get the arguments,
0760 and start operations.
0761 """
0762 parser = args_parse()
0763 parser = set_args(parser)
0764 pm = PluginMgr(parser)
0765 parser = pm.call_add_args(parser)
0766 (args, remaining) = parser.parse_known_args()
0767 args.NAMES = NAMES
0768 pm.set_args(args)
0769 check_default_settings(args, remaining, pm)
0770 if args.verbose > 2:
0771 print('args is {}'.format(args))
0772
0773 set_operation_mode(pm, parser, args, remaining)
0774
0775 if __name__ == "__main__":
0776 main()