Back to home page

OSCL-LXR

 
 

    


0001 # SPDX-License-Identifier: GPL-2.0
0002 # Copyright (c) 2020 SUSE LLC.
0003 
0004 import collections
0005 import functools
0006 import json
0007 import os
0008 import socket
0009 import subprocess
0010 import unittest
0011 
0012 
0013 # Add the source tree of bpftool and /usr/local/sbin to PATH
0014 cur_dir = os.path.dirname(os.path.realpath(__file__))
0015 bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
0016                                            "tools", "bpf", "bpftool"))
0017 os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
0018 
0019 
0020 class IfaceNotFoundError(Exception):
0021     pass
0022 
0023 
0024 class UnprivilegedUserError(Exception):
0025     pass
0026 
0027 
0028 def _bpftool(args, json=True):
0029     _args = ["bpftool"]
0030     if json:
0031         _args.append("-j")
0032     _args.extend(args)
0033 
0034     return subprocess.check_output(_args)
0035 
0036 
0037 def bpftool(args):
0038     return _bpftool(args, json=False).decode("utf-8")
0039 
0040 
0041 def bpftool_json(args):
0042     res = _bpftool(args)
0043     return json.loads(res)
0044 
0045 
0046 def get_default_iface():
0047     for iface in socket.if_nameindex():
0048         if iface[1] != "lo":
0049             return iface[1]
0050     raise IfaceNotFoundError("Could not find any network interface to probe")
0051 
0052 
0053 def default_iface(f):
0054     @functools.wraps(f)
0055     def wrapper(*args, **kwargs):
0056         iface = get_default_iface()
0057         return f(*args, iface, **kwargs)
0058     return wrapper
0059 
0060 DMESG_EMITTING_HELPERS = [
0061         "bpf_probe_write_user",
0062         "bpf_trace_printk",
0063         "bpf_trace_vprintk",
0064     ]
0065 
0066 class TestBpftool(unittest.TestCase):
0067     @classmethod
0068     def setUpClass(cls):
0069         if os.getuid() != 0:
0070             raise UnprivilegedUserError(
0071                 "This test suite needs root privileges")
0072 
0073     @default_iface
0074     def test_feature_dev_json(self, iface):
0075         unexpected_helpers = DMESG_EMITTING_HELPERS
0076         expected_keys = [
0077             "syscall_config",
0078             "program_types",
0079             "map_types",
0080             "helpers",
0081             "misc",
0082         ]
0083 
0084         res = bpftool_json(["feature", "probe", "dev", iface])
0085         # Check if the result has all expected keys.
0086         self.assertCountEqual(res.keys(), expected_keys)
0087         # Check if unexpected helpers are not included in helpers probes
0088         # result.
0089         for helpers in res["helpers"].values():
0090             for unexpected_helper in unexpected_helpers:
0091                 self.assertNotIn(unexpected_helper, helpers)
0092 
0093     def test_feature_kernel(self):
0094         test_cases = [
0095             bpftool_json(["feature", "probe", "kernel"]),
0096             bpftool_json(["feature", "probe"]),
0097             bpftool_json(["feature"]),
0098         ]
0099         unexpected_helpers = DMESG_EMITTING_HELPERS
0100         expected_keys = [
0101             "syscall_config",
0102             "system_config",
0103             "program_types",
0104             "map_types",
0105             "helpers",
0106             "misc",
0107         ]
0108 
0109         for tc in test_cases:
0110             # Check if the result has all expected keys.
0111             self.assertCountEqual(tc.keys(), expected_keys)
0112             # Check if unexpected helpers are not included in helpers probes
0113             # result.
0114             for helpers in tc["helpers"].values():
0115                 for unexpected_helper in unexpected_helpers:
0116                     self.assertNotIn(unexpected_helper, helpers)
0117 
0118     def test_feature_kernel_full(self):
0119         test_cases = [
0120             bpftool_json(["feature", "probe", "kernel", "full"]),
0121             bpftool_json(["feature", "probe", "full"]),
0122         ]
0123         expected_helpers = DMESG_EMITTING_HELPERS
0124 
0125         for tc in test_cases:
0126             # Check if expected helpers are included at least once in any
0127             # helpers list for any program type. Unfortunately we cannot assume
0128             # that they will be included in all program types or a specific
0129             # subset of programs. It depends on the kernel version and
0130             # configuration.
0131             found_helpers = False
0132 
0133             for helpers in tc["helpers"].values():
0134                 if all(expected_helper in helpers
0135                        for expected_helper in expected_helpers):
0136                     found_helpers = True
0137                     break
0138 
0139             self.assertTrue(found_helpers)
0140 
0141     def test_feature_kernel_full_vs_not_full(self):
0142         full_res = bpftool_json(["feature", "probe", "full"])
0143         not_full_res = bpftool_json(["feature", "probe"])
0144         not_full_set = set()
0145         full_set = set()
0146 
0147         for helpers in full_res["helpers"].values():
0148             for helper in helpers:
0149                 full_set.add(helper)
0150 
0151         for helpers in not_full_res["helpers"].values():
0152             for helper in helpers:
0153                 not_full_set.add(helper)
0154 
0155         self.assertCountEqual(full_set - not_full_set,
0156                               set(DMESG_EMITTING_HELPERS))
0157         self.assertCountEqual(not_full_set - full_set, set())
0158 
0159     def test_feature_macros(self):
0160         expected_patterns = [
0161             r"/\*\*\* System call availability \*\*\*/",
0162             r"#define HAVE_BPF_SYSCALL",
0163             r"/\*\*\* eBPF program types \*\*\*/",
0164             r"#define HAVE.*PROG_TYPE",
0165             r"/\*\*\* eBPF map types \*\*\*/",
0166             r"#define HAVE.*MAP_TYPE",
0167             r"/\*\*\* eBPF helper functions \*\*\*/",
0168             r"#define HAVE.*HELPER",
0169             r"/\*\*\* eBPF misc features \*\*\*/",
0170         ]
0171 
0172         res = bpftool(["feature", "probe", "macros"])
0173         for pattern in expected_patterns:
0174             self.assertRegex(res, pattern)