Back to home page

OSCL-LXR

 
 

    


0001 #!/usr/bin/env python3
0002 # SPDX-License-Identifier: GPL-2.0
0003 
0004 """
0005 tdc.py - Linux tc (Traffic Control) unit test driver
0006 
0007 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
0008 """
0009 
0010 import re
0011 import os
0012 import sys
0013 import argparse
0014 import importlib
0015 import json
0016 import subprocess
0017 import time
0018 import traceback
0019 from collections import OrderedDict
0020 from string import Template
0021 
0022 from tdc_config import *
0023 from tdc_helper import *
0024 
0025 import TdcPlugin
0026 from TdcResults import *
0027 
0028 class PluginDependencyException(Exception):
0029     def __init__(self, missing_pg):
0030         self.missing_pg = missing_pg
0031 
0032 class PluginMgrTestFail(Exception):
0033     def __init__(self, stage, output, message):
0034         self.stage = stage
0035         self.output = output
0036         self.message = message
0037 
0038 class PluginMgr:
0039     def __init__(self, argparser):
0040         super().__init__()
0041         self.plugins = {}
0042         self.plugin_instances = []
0043         self.failed_plugins = {}
0044         self.argparser = argparser
0045 
0046         # TODO, put plugins in order
0047         plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
0048         for dirpath, dirnames, filenames in os.walk(plugindir):
0049             for fn in filenames:
0050                 if (fn.endswith('.py') and
0051                     not fn == '__init__.py' and
0052                     not fn.startswith('#') and
0053                     not fn.startswith('.#')):
0054                     mn = fn[0:-3]
0055                     foo = importlib.import_module('plugins.' + mn)
0056                     self.plugins[mn] = foo
0057                     self.plugin_instances.append(foo.SubPlugin())
0058 
0059     def load_plugin(self, pgdir, pgname):
0060         pgname = pgname[0:-3]
0061         foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
0062         self.plugins[pgname] = foo
0063         self.plugin_instances.append(foo.SubPlugin())
0064         self.plugin_instances[-1].check_args(self.args, None)
0065 
0066     def get_required_plugins(self, testlist):
0067         '''
0068         Get all required plugins from the list of test cases and return
0069         all unique items.
0070         '''
0071         reqs = []
0072         for t in testlist:
0073             try:
0074                 if 'requires' in t['plugins']:
0075                     if isinstance(t['plugins']['requires'], list):
0076                         reqs.extend(t['plugins']['requires'])
0077                     else:
0078                         reqs.append(t['plugins']['requires'])
0079             except KeyError:
0080                 continue
0081         reqs = get_unique_item(reqs)
0082         return reqs
0083 
0084     def load_required_plugins(self, reqs, parser, args, remaining):
0085         '''
0086         Get all required plugins from the list of test cases and load any plugin
0087         that is not already enabled.
0088         '''
0089         pgd = ['plugin-lib', 'plugin-lib-custom']
0090         pnf = []
0091 
0092         for r in reqs:
0093             if r not in self.plugins:
0094                 fname = '{}.py'.format(r)
0095                 source_path = []
0096                 for d in pgd:
0097                     pgpath = '{}/{}'.format(d, fname)
0098                     if os.path.isfile(pgpath):
0099                         source_path.append(pgpath)
0100                 if len(source_path) == 0:
0101                     print('ERROR: unable to find required plugin {}'.format(r))
0102                     pnf.append(fname)
0103                     continue
0104                 elif len(source_path) > 1:
0105                     print('WARNING: multiple copies of plugin {} found, using version found')
0106                     print('at {}'.format(source_path[0]))
0107                 pgdir = source_path[0]
0108                 pgdir = pgdir.split('/')[0]
0109                 self.load_plugin(pgdir, fname)
0110         if len(pnf) > 0:
0111             raise PluginDependencyException(pnf)
0112 
0113         parser = self.call_add_args(parser)
0114         (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
0115         return args
0116 
0117     def call_pre_suite(self, testcount, testidlist):
0118         for pgn_inst in self.plugin_instances:
0119             pgn_inst.pre_suite(testcount, testidlist)
0120 
0121     def call_post_suite(self, index):
0122         for pgn_inst in reversed(self.plugin_instances):
0123             pgn_inst.post_suite(index)
0124 
0125     def call_pre_case(self, caseinfo, *, test_skip=False):
0126         for pgn_inst in self.plugin_instances:
0127             try:
0128                 pgn_inst.pre_case(caseinfo, test_skip)
0129             except Exception as ee:
0130                 print('exception {} in call to pre_case for {} plugin'.
0131                       format(ee, pgn_inst.__class__))
0132                 print('test_ordinal is {}'.format(test_ordinal))
0133                 print('testid is {}'.format(caseinfo['id']))
0134                 raise
0135 
0136     def call_post_case(self):
0137         for pgn_inst in reversed(self.plugin_instances):
0138             pgn_inst.post_case()
0139 
0140     def call_pre_execute(self):
0141         for pgn_inst in self.plugin_instances:
0142             pgn_inst.pre_execute()
0143 
0144     def call_post_execute(self):
0145         for pgn_inst in reversed(self.plugin_instances):
0146             pgn_inst.post_execute()
0147 
0148     def call_add_args(self, parser):
0149         for pgn_inst in self.plugin_instances:
0150             parser = pgn_inst.add_args(parser)
0151         return parser
0152 
0153     def call_check_args(self, args, remaining):
0154         for pgn_inst in self.plugin_instances:
0155             pgn_inst.check_args(args, remaining)
0156 
0157     def call_adjust_command(self, stage, command):
0158         for pgn_inst in self.plugin_instances:
0159             command = pgn_inst.adjust_command(stage, command)
0160         return command
0161 
0162     def set_args(self, args):
0163         self.args = args
0164 
0165     @staticmethod
0166     def _make_argparser(args):
0167         self.argparser = argparse.ArgumentParser(
0168             description='Linux TC unit tests')
0169 
0170 def replace_keywords(cmd):
0171     """
0172     For a given executable command, substitute any known
0173     variables contained within NAMES with the correct values
0174     """
0175     tcmd = Template(cmd)
0176     subcmd = tcmd.safe_substitute(NAMES)
0177     return subcmd
0178 
0179 
0180 def exec_cmd(args, pm, stage, command):
0181     """
0182     Perform any required modifications on an executable command, then run
0183     it in a subprocess and return the results.
0184     """
0185     if len(command.strip()) == 0:
0186         return None, None
0187     if '$' in command:
0188         command = replace_keywords(command)
0189 
0190     command = pm.call_adjust_command(stage, command)
0191     if args.verbose > 0:
0192         print('command "{}"'.format(command))
0193     proc = subprocess.Popen(command,
0194         shell=True,
0195         stdout=subprocess.PIPE,
0196         stderr=subprocess.PIPE,
0197         env=ENVIR)
0198 
0199     try:
0200         (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
0201         if proc.returncode != 0 and len(serr) > 0:
0202             foutput = serr.decode("utf-8", errors="ignore")
0203         else:
0204             foutput = rawout.decode("utf-8", errors="ignore")
0205     except subprocess.TimeoutExpired:
0206         foutput = "Command \"{}\" timed out\n".format(command)
0207         proc.returncode = 255
0208 
0209     proc.stdout.close()
0210     proc.stderr.close()
0211     return proc, foutput
0212 
0213 
0214 def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
0215     """
0216     Execute the setup/teardown commands for a test case.
0217     Optionally terminate test execution if the command fails.
0218     """
0219     if args.verbose > 0:
0220         print('{}'.format(prefix))
0221     for cmdinfo in cmdlist:
0222         if isinstance(cmdinfo, list):
0223             exit_codes = cmdinfo[1:]
0224             cmd = cmdinfo[0]
0225         else:
0226             exit_codes = [0]
0227             cmd = cmdinfo
0228 
0229         if not cmd:
0230             continue
0231 
0232         (proc, foutput) = exec_cmd(args, pm, stage, cmd)
0233 
0234         if proc and (proc.returncode not in exit_codes):
0235             print('', file=sys.stderr)
0236             print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
0237                   file=sys.stderr)
0238             print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
0239                   file=sys.stderr)
0240             print("returncode {}; expected {}".format(proc.returncode,
0241                                                       exit_codes))
0242             print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
0243             print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
0244             print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
0245             raise PluginMgrTestFail(
0246                 stage, output,
0247                 '"{}" did not complete successfully'.format(prefix))
0248 
0249 def run_one_test(pm, args, index, tidx):
0250     global NAMES
0251     result = True
0252     tresult = ""
0253     tap = ""
0254     res = TestResult(tidx['id'], tidx['name'])
0255     if args.verbose > 0:
0256         print("\t====================\n=====> ", end="")
0257     print("Test " + tidx["id"] + ": " + tidx["name"])
0258 
0259     if 'skip' in tidx:
0260         if tidx['skip'] == 'yes':
0261             res = TestResult(tidx['id'], tidx['name'])
0262             res.set_result(ResultState.skip)
0263             res.set_errormsg('Test case designated as skipped.')
0264             pm.call_pre_case(tidx, test_skip=True)
0265             pm.call_post_execute()
0266             return res
0267 
0268     # populate NAMES with TESTID for this test
0269     NAMES['TESTID'] = tidx['id']
0270 
0271     pm.call_pre_case(tidx)
0272     prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
0273 
0274     if (args.verbose > 0):
0275         print('-----> execute stage')
0276     pm.call_pre_execute()
0277     (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
0278     if p:
0279         exit_code = p.returncode
0280     else:
0281         exit_code = None
0282 
0283     pm.call_post_execute()
0284 
0285     if (exit_code is None or exit_code != int(tidx["expExitCode"])):
0286         print("exit: {!r}".format(exit_code))
0287         print("exit: {}".format(int(tidx["expExitCode"])))
0288         #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
0289         res.set_result(ResultState.fail)
0290         res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
0291         print(procout)
0292     else:
0293         if args.verbose > 0:
0294             print('-----> verify stage')
0295         match_pattern = re.compile(
0296             str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
0297         (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
0298         if procout:
0299             match_index = re.findall(match_pattern, procout)
0300             if len(match_index) != int(tidx["matchCount"]):
0301                 res.set_result(ResultState.fail)
0302                 res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
0303             else:
0304                 res.set_result(ResultState.success)
0305         elif int(tidx["matchCount"]) != 0:
0306             res.set_result(ResultState.fail)
0307             res.set_failmsg('No output generated by verify command.')
0308         else:
0309             res.set_result(ResultState.success)
0310 
0311     prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
0312     pm.call_post_case()
0313 
0314     index += 1
0315 
0316     # remove TESTID from NAMES
0317     del(NAMES['TESTID'])
0318     return res
0319 
0320 def test_runner(pm, args, filtered_tests):
0321     """
0322     Driver function for the unit tests.
0323 
0324     Prints information about the tests being run, executes the setup and
0325     teardown commands and the command under test itself. Also determines
0326     success/failure based on the information in the test case and generates
0327     TAP output accordingly.
0328     """
0329     testlist = filtered_tests
0330     tcount = len(testlist)
0331     index = 1
0332     tap = ''
0333     badtest = None
0334     stage = None
0335     emergency_exit = False
0336     emergency_exit_message = ''
0337 
0338     tsr = TestSuiteReport()
0339 
0340     try:
0341         pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
0342     except Exception as ee:
0343         ex_type, ex, ex_tb = sys.exc_info()
0344         print('Exception {} {} (caught in pre_suite).'.
0345               format(ex_type, ex))
0346         traceback.print_tb(ex_tb)
0347         emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
0348         emergency_exit = True
0349         stage = 'pre-SUITE'
0350 
0351     if emergency_exit:
0352         pm.call_post_suite(index)
0353         return emergency_exit_message
0354     if args.verbose > 1:
0355         print('give test rig 2 seconds to stabilize')
0356     time.sleep(2)
0357     for tidx in testlist:
0358         if "flower" in tidx["category"] and args.device == None:
0359             errmsg = "Tests using the DEV2 variable must define the name of a "
0360             errmsg += "physical NIC with the -d option when running tdc.\n"
0361             errmsg += "Test has been skipped."
0362             if args.verbose > 1:
0363                 print(errmsg)
0364             res = TestResult(tidx['id'], tidx['name'])
0365             res.set_result(ResultState.skip)
0366             res.set_errormsg(errmsg)
0367             tsr.add_resultdata(res)
0368             continue
0369         try:
0370             badtest = tidx  # in case it goes bad
0371             res = run_one_test(pm, args, index, tidx)
0372             tsr.add_resultdata(res)
0373         except PluginMgrTestFail as pmtf:
0374             ex_type, ex, ex_tb = sys.exc_info()
0375             stage = pmtf.stage
0376             message = pmtf.message
0377             output = pmtf.output
0378             res = TestResult(tidx['id'], tidx['name'])
0379             res.set_result(ResultState.skip)
0380             res.set_errormsg(pmtf.message)
0381             res.set_failmsg(pmtf.output)
0382             tsr.add_resultdata(res)
0383             index += 1
0384             print(message)
0385             print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
0386                   format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
0387             print('---------------')
0388             print('traceback')
0389             traceback.print_tb(ex_tb)
0390             print('---------------')
0391             if stage == 'teardown':
0392                 print('accumulated output for this test:')
0393                 if pmtf.output:
0394                     print(pmtf.output)
0395             print('---------------')
0396             break
0397         index += 1
0398 
0399     # if we failed in setup or teardown,
0400     # fill in the remaining tests with ok-skipped
0401     count = index
0402 
0403     if tcount + 1 != count:
0404         for tidx in testlist[count - 1:]:
0405             res = TestResult(tidx['id'], tidx['name'])
0406             res.set_result(ResultState.skip)
0407             msg = 'skipped - previous {} failed {} {}'.format(stage,
0408                 index, badtest.get('id', '--Unknown--'))
0409             res.set_errormsg(msg)
0410             tsr.add_resultdata(res)
0411             count += 1
0412 
0413     if args.pause:
0414         print('Want to pause\nPress enter to continue ...')
0415         if input(sys.stdin):
0416             print('got something on stdin')
0417 
0418     pm.call_post_suite(index)
0419 
0420     return tsr
0421 
0422 def has_blank_ids(idlist):
0423     """
0424     Search the list for empty ID fields and return true/false accordingly.
0425     """
0426     return not(all(k for k in idlist))
0427 
0428 
0429 def load_from_file(filename):
0430     """
0431     Open the JSON file containing the test cases and return them
0432     as list of ordered dictionary objects.
0433     """
0434     try:
0435         with open(filename) as test_data:
0436             testlist = json.load(test_data, object_pairs_hook=OrderedDict)
0437     except json.JSONDecodeError as jde:
0438         print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
0439         testlist = list()
0440     else:
0441         idlist = get_id_list(testlist)
0442         if (has_blank_ids(idlist)):
0443             for k in testlist:
0444                 k['filename'] = filename
0445     return testlist
0446 
0447 
0448 def args_parse():
0449     """
0450     Create the argument parser.
0451     """
0452     parser = argparse.ArgumentParser(description='Linux TC unit tests')
0453     return parser
0454 
0455 
0456 def set_args(parser):
0457     """
0458     Set the command line arguments for tdc.
0459     """
0460     parser.add_argument(
0461         '--outfile', type=str,
0462         help='Path to the file in which results should be saved. ' +
0463         'Default target is the current directory.')
0464     parser.add_argument(
0465         '-p', '--path', type=str,
0466         help='The full path to the tc executable to use')
0467     sg = parser.add_argument_group(
0468         'selection', 'select which test cases: ' +
0469         'files plus directories; filtered by categories plus testids')
0470     ag = parser.add_argument_group(
0471         'action', 'select action to perform on selected test cases')
0472 
0473     sg.add_argument(
0474         '-D', '--directory', nargs='+', metavar='DIR',
0475         help='Collect tests from the specified directory(ies) ' +
0476         '(default [tc-tests])')
0477     sg.add_argument(
0478         '-f', '--file', nargs='+', metavar='FILE',
0479         help='Run tests from the specified file(s)')
0480     sg.add_argument(
0481         '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
0482         help='Run tests only from the specified category/ies, ' +
0483         'or if no category/ies is/are specified, list known categories.')
0484     sg.add_argument(
0485         '-e', '--execute', nargs='+', metavar='ID',
0486         help='Execute the specified test cases with specified IDs')
0487     ag.add_argument(
0488         '-l', '--list', action='store_true',
0489         help='List all test cases, or those only within the specified category')
0490     ag.add_argument(
0491         '-s', '--show', action='store_true', dest='showID',
0492         help='Display the selected test cases')
0493     ag.add_argument(
0494         '-i', '--id', action='store_true', dest='gen_id',
0495         help='Generate ID numbers for new test cases')
0496     parser.add_argument(
0497         '-v', '--verbose', action='count', default=0,
0498         help='Show the commands that are being run')
0499     parser.add_argument(
0500         '--format', default='tap', const='tap', nargs='?',
0501         choices=['none', 'xunit', 'tap'],
0502         help='Specify the format for test results. (Default: TAP)')
0503     parser.add_argument('-d', '--device',
0504                         help='Execute test cases that use a physical device, ' +
0505                         'where DEVICE is its name. (If not defined, tests ' +
0506                         'that require a physical device will be skipped)')
0507     parser.add_argument(
0508         '-P', '--pause', action='store_true',
0509         help='Pause execution just before post-suite stage')
0510     return parser
0511 
0512 
0513 def check_default_settings(args, remaining, pm):
0514     """
0515     Process any arguments overriding the default settings,
0516     and ensure the settings are correct.
0517     """
0518     # Allow for overriding specific settings
0519     global NAMES
0520 
0521     if args.path != None:
0522         NAMES['TC'] = args.path
0523     if args.device != None:
0524         NAMES['DEV2'] = args.device
0525     if 'TIMEOUT' not in NAMES:
0526         NAMES['TIMEOUT'] = None
0527     if not os.path.isfile(NAMES['TC']):
0528         print("The specified tc path " + NAMES['TC'] + " does not exist.")
0529         exit(1)
0530 
0531     pm.call_check_args(args, remaining)
0532 
0533 
0534 def get_id_list(alltests):
0535     """
0536     Generate a list of all IDs in the test cases.
0537     """
0538     return [x["id"] for x in alltests]
0539 
0540 
0541 def check_case_id(alltests):
0542     """
0543     Check for duplicate test case IDs.
0544     """
0545     idl = get_id_list(alltests)
0546     return [x for x in idl if idl.count(x) > 1]
0547 
0548 
0549 def does_id_exist(alltests, newid):
0550     """
0551     Check if a given ID already exists in the list of test cases.
0552     """
0553     idl = get_id_list(alltests)
0554     return (any(newid == x for x in idl))
0555 
0556 
0557 def generate_case_ids(alltests):
0558     """
0559     If a test case has a blank ID field, generate a random hex ID for it
0560     and then write the test cases back to disk.
0561     """
0562     import random
0563     for c in alltests:
0564         if (c["id"] == ""):
0565             while True:
0566                 newid = str('{:04x}'.format(random.randrange(16**4)))
0567                 if (does_id_exist(alltests, newid)):
0568                     continue
0569                 else:
0570                     c['id'] = newid
0571                     break
0572 
0573     ufilename = []
0574     for c in alltests:
0575         if ('filename' in c):
0576             ufilename.append(c['filename'])
0577     ufilename = get_unique_item(ufilename)
0578     for f in ufilename:
0579         testlist = []
0580         for t in alltests:
0581             if 'filename' in t:
0582                 if t['filename'] == f:
0583                     del t['filename']
0584                     testlist.append(t)
0585         outfile = open(f, "w")
0586         json.dump(testlist, outfile, indent=4)
0587         outfile.write("\n")
0588         outfile.close()
0589 
0590 def filter_tests_by_id(args, testlist):
0591     '''
0592     Remove tests from testlist that are not in the named id list.
0593     If id list is empty, return empty list.
0594     '''
0595     newlist = list()
0596     if testlist and args.execute:
0597         target_ids = args.execute
0598 
0599         if isinstance(target_ids, list) and (len(target_ids) > 0):
0600             newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
0601     return newlist
0602 
0603 def filter_tests_by_category(args, testlist):
0604     '''
0605     Remove tests from testlist that are not in a named category.
0606     '''
0607     answer = list()
0608     if args.category and testlist:
0609         test_ids = list()
0610         for catg in set(args.category):
0611             if catg == '+c':
0612                 continue
0613             print('considering category {}'.format(catg))
0614             for tc in testlist:
0615                 if catg in tc['category'] and tc['id'] not in test_ids:
0616                     answer.append(tc)
0617                     test_ids.append(tc['id'])
0618 
0619     return answer
0620 
0621 
0622 def get_test_cases(args):
0623     """
0624     If a test case file is specified, retrieve tests from that file.
0625     Otherwise, glob for all json files in subdirectories and load from
0626     each one.
0627     Also, if requested, filter by category, and add tests matching
0628     certain ids.
0629     """
0630     import fnmatch
0631 
0632     flist = []
0633     testdirs = ['tc-tests']
0634 
0635     if args.file:
0636         # at least one file was specified - remove the default directory
0637         testdirs = []
0638 
0639         for ff in args.file:
0640             if not os.path.isfile(ff):
0641                 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
0642             else:
0643                 flist.append(os.path.abspath(ff))
0644 
0645     if args.directory:
0646         testdirs = args.directory
0647 
0648     for testdir in testdirs:
0649         for root, dirnames, filenames in os.walk(testdir):
0650             for filename in fnmatch.filter(filenames, '*.json'):
0651                 candidate = os.path.abspath(os.path.join(root, filename))
0652                 if candidate not in testdirs:
0653                     flist.append(candidate)
0654 
0655     alltestcases = list()
0656     for casefile in flist:
0657         alltestcases = alltestcases + (load_from_file(casefile))
0658 
0659     allcatlist = get_test_categories(alltestcases)
0660     allidlist = get_id_list(alltestcases)
0661 
0662     testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
0663     idtestcases = filter_tests_by_id(args, alltestcases)
0664     cattestcases = filter_tests_by_category(args, alltestcases)
0665 
0666     cat_ids = [x['id'] for x in cattestcases]
0667     if args.execute:
0668         if args.category:
0669             alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
0670         else:
0671             alltestcases = idtestcases
0672     else:
0673         if cat_ids:
0674             alltestcases = cattestcases
0675         else:
0676             # just accept the existing value of alltestcases,
0677             # which has been filtered by file/directory
0678             pass
0679 
0680     return allcatlist, allidlist, testcases_by_cats, alltestcases
0681 
0682 
0683 def set_operation_mode(pm, parser, args, remaining):
0684     """
0685     Load the test case data and process remaining arguments to determine
0686     what the script should do for this run, and call the appropriate
0687     function.
0688     """
0689     ucat, idlist, testcases, alltests = get_test_cases(args)
0690 
0691     if args.gen_id:
0692         if (has_blank_ids(idlist)):
0693             alltests = generate_case_ids(alltests)
0694         else:
0695             print("No empty ID fields found in test files.")
0696         exit(0)
0697 
0698     duplicate_ids = check_case_id(alltests)
0699     if (len(duplicate_ids) > 0):
0700         print("The following test case IDs are not unique:")
0701         print(str(set(duplicate_ids)))
0702         print("Please correct them before continuing.")
0703         exit(1)
0704 
0705     if args.showID:
0706         for atest in alltests:
0707             print_test_case(atest)
0708         exit(0)
0709 
0710     if isinstance(args.category, list) and (len(args.category) == 0):
0711         print("Available categories:")
0712         print_sll(ucat)
0713         exit(0)
0714 
0715     if args.list:
0716         list_test_cases(alltests)
0717         exit(0)
0718 
0719     exit_code = 0 # KSFT_PASS
0720     if len(alltests):
0721         req_plugins = pm.get_required_plugins(alltests)
0722         try:
0723             args = pm.load_required_plugins(req_plugins, parser, args, remaining)
0724         except PluginDependencyException as pde:
0725             print('The following plugins were not found:')
0726             print('{}'.format(pde.missing_pg))
0727         catresults = test_runner(pm, args, alltests)
0728         if catresults.count_failures() != 0:
0729             exit_code = 1 # KSFT_FAIL
0730         if args.format == 'none':
0731             print('Test results output suppression requested\n')
0732         else:
0733             print('\nAll test results: \n')
0734             if args.format == 'xunit':
0735                 suffix = 'xml'
0736                 res = catresults.format_xunit()
0737             elif args.format == 'tap':
0738                 suffix = 'tap'
0739                 res = catresults.format_tap()
0740             print(res)
0741             print('\n\n')
0742             if not args.outfile:
0743                 fname = 'test-results.{}'.format(suffix)
0744             else:
0745                 fname = args.outfile
0746             with open(fname, 'w') as fh:
0747                 fh.write(res)
0748                 fh.close()
0749                 if os.getenv('SUDO_UID') is not None:
0750                     os.chown(fname, uid=int(os.getenv('SUDO_UID')),
0751                         gid=int(os.getenv('SUDO_GID')))
0752     else:
0753         print('No tests found\n')
0754         exit_code = 4 # KSFT_SKIP
0755     exit(exit_code)
0756 
0757 def main():
0758     """
0759     Start of execution; set up argument parser and get the arguments,
0760     and start operations.
0761     """
0762     parser = args_parse()
0763     parser = set_args(parser)
0764     pm = PluginMgr(parser)
0765     parser = pm.call_add_args(parser)
0766     (args, remaining) = parser.parse_known_args()
0767     args.NAMES = NAMES
0768     pm.set_args(args)
0769     check_default_settings(args, remaining, pm)
0770     if args.verbose > 2:
0771         print('args is {}'.format(args))
0772 
0773     set_operation_mode(pm, parser, args, remaining)
0774 
0775 if __name__ == "__main__":
0776     main()