Back to home page

OSCL-LXR

 
 

    


0001 #!/usr/bin/env python3
0002 # SPDX-License-Identifier: GPL-2.0
0003 #
0004 # Program to allow users to fuzz test Hyper-V drivers
0005 # by interfacing with Hyper-V debugfs attributes.
0006 # Current test methods available:
0007 #       1. delay testing
0008 #
0009 # Current file/directory structure of hyper-V debugfs:
0010 #       /sys/kernel/debug/hyperv/UUID
0011 #       /sys/kernel/debug/hyperv/UUID/<test-state filename>
0012 #       /sys/kernel/debug/hyperv/UUID/<test-method sub-directory>
0013 #
0014 # author: Branden Bonaby <brandonbonaby94@gmail.com>
0015 
0016 import os
0017 import cmd
0018 import argparse
0019 import glob
0020 from argparse import RawDescriptionHelpFormatter
0021 from argparse import RawTextHelpFormatter
0022 from enum import Enum
0023 
0024 # Do not change unless, you change the debugfs attributes
0025 # in /drivers/hv/debugfs.c. All fuzz testing
0026 # attributes will start with "fuzz_test".
0027 
0028 # debugfs path for hyperv must exist before proceeding
0029 debugfs_hyperv_path = "/sys/kernel/debug/hyperv"
0030 if not os.path.isdir(debugfs_hyperv_path):
0031         print("{} doesn't exist/check permissions".format(debugfs_hyperv_path))
0032         exit(-1)
0033 
0034 class dev_state(Enum):
0035         off = 0
0036         on = 1
0037 
0038 # File names, that correspond to the files created in
0039 # /drivers/hv/debugfs.c
0040 class f_names(Enum):
0041         state_f = "fuzz_test_state"
0042         buff_f =  "fuzz_test_buffer_interrupt_delay"
0043         mess_f =  "fuzz_test_message_delay"
0044 
0045 # Both single_actions and all_actions are used
0046 # for error checking and to allow for some subparser
0047 # names to be abbreviated. Do not abbreviate the
0048 # test method names, as it will become less intuitive
0049 # as to what the user can do. If you do decide to
0050 # abbreviate the test method name, make sure the main
0051 # function reflects this change.
0052 
0053 all_actions = [
0054         "disable_all",
0055         "D",
0056         "enable_all",
0057         "view_all",
0058         "V"
0059 ]
0060 
0061 single_actions = [
0062         "disable_single",
0063         "d",
0064         "enable_single",
0065         "view_single",
0066         "v"
0067 ]
0068 
0069 def main():
0070 
0071         file_map = recursive_file_lookup(debugfs_hyperv_path, dict())
0072         args = parse_args()
0073         if (not args.action):
0074                 print ("Error, no options selected...exiting")
0075                 exit(-1)
0076         arg_set = { k for (k,v) in vars(args).items() if v and k != "action" }
0077         arg_set.add(args.action)
0078         path = args.path if "path" in arg_set else None
0079         if (path and path[-1] == "/"):
0080                 path = path[:-1]
0081         validate_args_path(path, arg_set, file_map)
0082         if (path and "enable_single" in arg_set):
0083             state_path = locate_state(path, file_map)
0084             set_test_state(state_path, dev_state.on.value, args.quiet)
0085 
0086         # Use subparsers as the key for different actions
0087         if ("delay" in arg_set):
0088                 validate_delay_values(args.delay_time)
0089                 if (args.enable_all):
0090                         set_delay_all_devices(file_map, args.delay_time,
0091                                               args.quiet)
0092                 else:
0093                         set_delay_values(path, file_map, args.delay_time,
0094                                          args.quiet)
0095         elif ("disable_all" in arg_set or "D" in arg_set):
0096                 disable_all_testing(file_map)
0097         elif ("disable_single" in arg_set or "d" in arg_set):
0098                 disable_testing_single_device(path, file_map)
0099         elif ("view_all" in arg_set or "V" in arg_set):
0100                 get_all_devices_test_status(file_map)
0101         elif ("view_single" in arg_set or  "v" in arg_set):
0102                 get_device_test_values(path, file_map)
0103 
0104 # Get the state location
0105 def locate_state(device, file_map):
0106         return file_map[device][f_names.state_f.value]
0107 
0108 # Validate delay values to make sure they are acceptable to
0109 # enable delays on a device
0110 def validate_delay_values(delay):
0111 
0112         if (delay[0]  == -1 and delay[1] == -1):
0113                 print("\nError, At least 1 value must be greater than 0")
0114                 exit(-1)
0115         for i in delay:
0116                 if (i < -1 or i == 0 or i > 1000):
0117                         print("\nError, Values must be  equal to -1 "
0118                               "or be > 0 and <= 1000")
0119                         exit(-1)
0120 
0121 # Validate argument path
0122 def validate_args_path(path, arg_set, file_map):
0123 
0124         if (not path and any(element in arg_set for element in single_actions)):
0125                 print("Error, path (-p) REQUIRED for the specified option. "
0126                       "Use (-h) to check usage.")
0127                 exit(-1)
0128         elif (path and any(item in arg_set for item in all_actions)):
0129                 print("Error, path (-p) NOT REQUIRED for the specified option. "
0130                       "Use (-h) to check usage." )
0131                 exit(-1)
0132         elif (path not in file_map and any(item in arg_set
0133                                            for item in single_actions)):
0134                 print("Error, path '{}' not a valid vmbus device".format(path))
0135                 exit(-1)
0136 
0137 # display Testing status of single device
0138 def get_device_test_values(path, file_map):
0139 
0140         for name in file_map[path]:
0141                 file_location = file_map[path][name]
0142                 print( name + " = " + str(read_test_files(file_location)))
0143 
0144 # Create a map of the vmbus devices and their associated files
0145 # [key=device, value = [key = filename, value = file path]]
0146 def recursive_file_lookup(path, file_map):
0147 
0148         for f_path in glob.iglob(path + '**/*'):
0149                 if (os.path.isfile(f_path)):
0150                         if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path):
0151                                 directory = f_path.rsplit("/",1)[0]
0152                         else:
0153                                 directory = f_path.rsplit("/",2)[0]
0154                         f_name = f_path.split("/")[-1]
0155                         if (file_map.get(directory)):
0156                                 file_map[directory].update({f_name:f_path})
0157                         else:
0158                                 file_map[directory] = {f_name:f_path}
0159                 elif (os.path.isdir(f_path)):
0160                         recursive_file_lookup(f_path,file_map)
0161         return file_map
0162 
0163 # display Testing state of devices
0164 def get_all_devices_test_status(file_map):
0165 
0166         for device in file_map:
0167                 if (get_test_state(locate_state(device, file_map)) is 1):
0168                         print("Testing = ON for: {}"
0169                               .format(device.split("/")[5]))
0170                 else:
0171                         print("Testing = OFF for: {}"
0172                               .format(device.split("/")[5]))
0173 
0174 # read the vmbus device files, path must be absolute path before calling
0175 def read_test_files(path):
0176         try:
0177                 with open(path,"r") as f:
0178                         file_value = f.readline().strip()
0179                 return int(file_value)
0180 
0181         except IOError as e:
0182                 errno, strerror = e.args
0183                 print("I/O error({0}): {1} on file {2}"
0184                       .format(errno, strerror, path))
0185                 exit(-1)
0186         except ValueError:
0187                 print ("Element to int conversion error in: \n{}".format(path))
0188                 exit(-1)
0189 
0190 # writing to vmbus device files, path must be absolute path before calling
0191 def write_test_files(path, value):
0192 
0193         try:
0194                 with open(path,"w") as f:
0195                         f.write("{}".format(value))
0196         except IOError as e:
0197                 errno, strerror = e.args
0198                 print("I/O error({0}): {1} on file {2}"
0199                       .format(errno, strerror, path))
0200                 exit(-1)
0201 
0202 # set testing state of device
0203 def set_test_state(state_path, state_value, quiet):
0204 
0205         write_test_files(state_path, state_value)
0206         if (get_test_state(state_path) is 1):
0207                 if (not quiet):
0208                         print("Testing = ON for device: {}"
0209                               .format(state_path.split("/")[5]))
0210         else:
0211                 if (not quiet):
0212                         print("Testing = OFF for device: {}"
0213                               .format(state_path.split("/")[5]))
0214 
0215 # get testing state of device
0216 def get_test_state(state_path):
0217         #state == 1 - test = ON
0218         #state == 0 - test = OFF
0219         return  read_test_files(state_path)
0220 
0221 # write 1 - 1000 microseconds, into a single device using the
0222 # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay
0223 # debugfs attributes
0224 def set_delay_values(device, file_map, delay_length, quiet):
0225 
0226         try:
0227                 interrupt = file_map[device][f_names.buff_f.value]
0228                 message = file_map[device][f_names.mess_f.value]
0229 
0230                 # delay[0]- buffer interrupt delay, delay[1]- message delay
0231                 if (delay_length[0] >= 0 and delay_length[0] <= 1000):
0232                         write_test_files(interrupt, delay_length[0])
0233                 if (delay_length[1] >= 0 and delay_length[1] <= 1000):
0234                         write_test_files(message, delay_length[1])
0235                 if (not quiet):
0236                         print("Buffer delay testing = {} for: {}"
0237                               .format(read_test_files(interrupt),
0238                                       interrupt.split("/")[5]))
0239                         print("Message delay testing = {} for: {}"
0240                               .format(read_test_files(message),
0241                                       message.split("/")[5]))
0242         except IOError as e:
0243                 errno, strerror = e.args
0244                 print("I/O error({0}): {1} on files {2}{3}"
0245                       .format(errno, strerror, interrupt, message))
0246                 exit(-1)
0247 
0248 # enabling delay testing on all devices
0249 def set_delay_all_devices(file_map, delay, quiet):
0250 
0251         for device in (file_map):
0252                 set_test_state(locate_state(device, file_map),
0253                                dev_state.on.value,
0254                                quiet)
0255                 set_delay_values(device, file_map, delay, quiet)
0256 
0257 # disable all testing on a SINGLE device.
0258 def disable_testing_single_device(device, file_map):
0259 
0260         for name in file_map[device]:
0261                 file_location = file_map[device][name]
0262                 write_test_files(file_location, dev_state.off.value)
0263         print("ALL testing now OFF for {}".format(device.split("/")[-1]))
0264 
0265 # disable all testing on ALL devices
0266 def disable_all_testing(file_map):
0267 
0268         for device in file_map:
0269                 disable_testing_single_device(device, file_map)
0270 
0271 def parse_args():
0272         parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n"
0273                 "%(prog)s [delay]   [-h] [-e|-E] -t [-p]\n"
0274                 "%(prog)s [view_all       | V]      [-h]\n"
0275                 "%(prog)s [disable_all    | D]      [-h]\n"
0276                 "%(prog)s [disable_single | d]      [-h|-p]\n"
0277                 "%(prog)s [view_single    | v]      [-h|-p]\n"
0278                 "%(prog)s --version\n",
0279                 description = "\nUse lsvmbus to get vmbus device type "
0280                 "information.\n" "\nThe debugfs root path is "
0281                 "/sys/kernel/debug/hyperv",
0282                 formatter_class = RawDescriptionHelpFormatter)
0283         subparsers = parser.add_subparsers(dest = "action")
0284         parser.add_argument("--version", action = "version",
0285                 version = '%(prog)s 0.1.0')
0286         parser.add_argument("-q","--quiet", action = "store_true",
0287                 help = "silence none important test messages."
0288                        " This will only work when enabling testing"
0289                        " on a device.")
0290         # Use the path parser to hold the --path attribute so it can
0291         # be shared between subparsers. Also do the same for the state
0292         # parser, as all testing methods will use --enable_all and
0293         # enable_single.
0294         path_parser = argparse.ArgumentParser(add_help=False)
0295         path_parser.add_argument("-p","--path", metavar = "",
0296                 help = "Debugfs path to a vmbus device. The path "
0297                 "must be the absolute path to the device.")
0298         state_parser = argparse.ArgumentParser(add_help=False)
0299         state_group = state_parser.add_mutually_exclusive_group(required = True)
0300         state_group.add_argument("-E", "--enable_all", action = "store_const",
0301                                  const = "enable_all",
0302                                  help = "Enable the specified test type "
0303                                  "on ALL vmbus devices.")
0304         state_group.add_argument("-e", "--enable_single",
0305                                  action = "store_const",
0306                                  const = "enable_single",
0307                                  help = "Enable the specified test type on a "
0308                                  "SINGLE vmbus device.")
0309         parser_delay = subparsers.add_parser("delay",
0310                         parents = [state_parser, path_parser],
0311                         help = "Delay the ring buffer interrupt or the "
0312                         "ring buffer message reads in microseconds.",
0313                         prog = "vmbus_testing",
0314                         usage = "%(prog)s [-h]\n"
0315                         "%(prog)s -E -t [value] [value]\n"
0316                         "%(prog)s -e -t [value] [value] -p",
0317                         description = "Delay the ring buffer interrupt for "
0318                         "vmbus devices, or delay the ring buffer message "
0319                         "reads for vmbus devices (both in microseconds). This "
0320                         "is only on the host to guest channel.")
0321         parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2,
0322                         type = check_range, default =[0,0], required = (True),
0323                         help = "Set [buffer] & [message] delay time. "
0324                         "Value constraints: -1 == value "
0325                         "or 0 < value <= 1000.\n"
0326                         "Use -1 to keep the previous value for that delay "
0327                         "type, or a value > 0 <= 1000 to change the delay "
0328                         "time.")
0329         parser_dis_all = subparsers.add_parser("disable_all",
0330                         aliases = ['D'], prog = "vmbus_testing",
0331                         usage = "%(prog)s [disable_all | D] -h\n"
0332                         "%(prog)s [disable_all | D]\n",
0333                         help = "Disable ALL testing on ALL vmbus devices.",
0334                         description = "Disable ALL testing on ALL vmbus "
0335                         "devices.")
0336         parser_dis_single = subparsers.add_parser("disable_single",
0337                         aliases = ['d'],
0338                         parents = [path_parser], prog = "vmbus_testing",
0339                         usage = "%(prog)s [disable_single | d] -h\n"
0340                         "%(prog)s [disable_single | d] -p\n",
0341                         help = "Disable ALL testing on a SINGLE vmbus device.",
0342                         description = "Disable ALL testing on a SINGLE vmbus "
0343                         "device.")
0344         parser_view_all = subparsers.add_parser("view_all", aliases = ['V'],
0345                         help = "View the test state for ALL vmbus devices.",
0346                         prog = "vmbus_testing",
0347                         usage = "%(prog)s [view_all | V] -h\n"
0348                         "%(prog)s [view_all | V]\n",
0349                         description = "This shows the test state for ALL the "
0350                         "vmbus devices.")
0351         parser_view_single = subparsers.add_parser("view_single",
0352                         aliases = ['v'],parents = [path_parser],
0353                         help = "View the test values for a SINGLE vmbus "
0354                         "device.",
0355                         description = "This shows the test values for a SINGLE "
0356                         "vmbus device.", prog = "vmbus_testing",
0357                         usage = "%(prog)s [view_single | v] -h\n"
0358                         "%(prog)s [view_single | v] -p")
0359 
0360         return  parser.parse_args()
0361 
0362 # value checking for range checking input in parser
0363 def check_range(arg1):
0364 
0365         try:
0366                 val = int(arg1)
0367         except ValueError as err:
0368                 raise argparse.ArgumentTypeError(str(err))
0369         if val < -1 or val > 1000:
0370                 message = ("\n\nvalue must be -1 or  0 < value <= 1000. "
0371                            "Value program received: {}\n").format(val)
0372                 raise argparse.ArgumentTypeError(message)
0373         return val
0374 
0375 if __name__ == "__main__":
0376         main()