0001
0002
0003
0004 import collections
0005 import functools
0006 import json
0007 import os
0008 import socket
0009 import subprocess
0010 import unittest
0011
0012
0013
0014 cur_dir = os.path.dirname(os.path.realpath(__file__))
0015 bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
0016 "tools", "bpf", "bpftool"))
0017 os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
0018
0019
0020 class IfaceNotFoundError(Exception):
0021 pass
0022
0023
0024 class UnprivilegedUserError(Exception):
0025 pass
0026
0027
0028 def _bpftool(args, json=True):
0029 _args = ["bpftool"]
0030 if json:
0031 _args.append("-j")
0032 _args.extend(args)
0033
0034 return subprocess.check_output(_args)
0035
0036
0037 def bpftool(args):
0038 return _bpftool(args, json=False).decode("utf-8")
0039
0040
0041 def bpftool_json(args):
0042 res = _bpftool(args)
0043 return json.loads(res)
0044
0045
0046 def get_default_iface():
0047 for iface in socket.if_nameindex():
0048 if iface[1] != "lo":
0049 return iface[1]
0050 raise IfaceNotFoundError("Could not find any network interface to probe")
0051
0052
0053 def default_iface(f):
0054 @functools.wraps(f)
0055 def wrapper(*args, **kwargs):
0056 iface = get_default_iface()
0057 return f(*args, iface, **kwargs)
0058 return wrapper
0059
0060 DMESG_EMITTING_HELPERS = [
0061 "bpf_probe_write_user",
0062 "bpf_trace_printk",
0063 "bpf_trace_vprintk",
0064 ]
0065
0066 class TestBpftool(unittest.TestCase):
0067 @classmethod
0068 def setUpClass(cls):
0069 if os.getuid() != 0:
0070 raise UnprivilegedUserError(
0071 "This test suite needs root privileges")
0072
0073 @default_iface
0074 def test_feature_dev_json(self, iface):
0075 unexpected_helpers = DMESG_EMITTING_HELPERS
0076 expected_keys = [
0077 "syscall_config",
0078 "program_types",
0079 "map_types",
0080 "helpers",
0081 "misc",
0082 ]
0083
0084 res = bpftool_json(["feature", "probe", "dev", iface])
0085
0086 self.assertCountEqual(res.keys(), expected_keys)
0087
0088
0089 for helpers in res["helpers"].values():
0090 for unexpected_helper in unexpected_helpers:
0091 self.assertNotIn(unexpected_helper, helpers)
0092
0093 def test_feature_kernel(self):
0094 test_cases = [
0095 bpftool_json(["feature", "probe", "kernel"]),
0096 bpftool_json(["feature", "probe"]),
0097 bpftool_json(["feature"]),
0098 ]
0099 unexpected_helpers = DMESG_EMITTING_HELPERS
0100 expected_keys = [
0101 "syscall_config",
0102 "system_config",
0103 "program_types",
0104 "map_types",
0105 "helpers",
0106 "misc",
0107 ]
0108
0109 for tc in test_cases:
0110
0111 self.assertCountEqual(tc.keys(), expected_keys)
0112
0113
0114 for helpers in tc["helpers"].values():
0115 for unexpected_helper in unexpected_helpers:
0116 self.assertNotIn(unexpected_helper, helpers)
0117
0118 def test_feature_kernel_full(self):
0119 test_cases = [
0120 bpftool_json(["feature", "probe", "kernel", "full"]),
0121 bpftool_json(["feature", "probe", "full"]),
0122 ]
0123 expected_helpers = DMESG_EMITTING_HELPERS
0124
0125 for tc in test_cases:
0126
0127
0128
0129
0130
0131 found_helpers = False
0132
0133 for helpers in tc["helpers"].values():
0134 if all(expected_helper in helpers
0135 for expected_helper in expected_helpers):
0136 found_helpers = True
0137 break
0138
0139 self.assertTrue(found_helpers)
0140
0141 def test_feature_kernel_full_vs_not_full(self):
0142 full_res = bpftool_json(["feature", "probe", "full"])
0143 not_full_res = bpftool_json(["feature", "probe"])
0144 not_full_set = set()
0145 full_set = set()
0146
0147 for helpers in full_res["helpers"].values():
0148 for helper in helpers:
0149 full_set.add(helper)
0150
0151 for helpers in not_full_res["helpers"].values():
0152 for helper in helpers:
0153 not_full_set.add(helper)
0154
0155 self.assertCountEqual(full_set - not_full_set,
0156 set(DMESG_EMITTING_HELPERS))
0157 self.assertCountEqual(not_full_set - full_set, set())
0158
0159 def test_feature_macros(self):
0160 expected_patterns = [
0161 r"/\*\*\* System call availability \*\*\*/",
0162 r"#define HAVE_BPF_SYSCALL",
0163 r"/\*\*\* eBPF program types \*\*\*/",
0164 r"#define HAVE.*PROG_TYPE",
0165 r"/\*\*\* eBPF map types \*\*\*/",
0166 r"#define HAVE.*MAP_TYPE",
0167 r"/\*\*\* eBPF helper functions \*\*\*/",
0168 r"#define HAVE.*HELPER",
0169 r"/\*\*\* eBPF misc features \*\*\*/",
0170 ]
0171
0172 res = bpftool(["feature", "probe", "macros"])
0173 for pattern in expected_patterns:
0174 self.assertRegex(res, pattern)