0001
0002
0003 from argparse import ArgumentParser
0004 from argparse import FileType
0005 import os
0006 import sys
0007 import tpm2
0008 from tpm2 import ProtocolError
0009 import unittest
0010 import logging
0011 import struct
0012
0013 class SmokeTest(unittest.TestCase):
0014 def setUp(self):
0015 self.client = tpm2.Client()
0016 self.root_key = self.client.create_root_key()
0017
0018 def tearDown(self):
0019 self.client.flush_context(self.root_key)
0020 self.client.close()
0021
0022 def test_seal_with_auth(self):
0023 data = ('X' * 64).encode()
0024 auth = ('A' * 15).encode()
0025
0026 blob = self.client.seal(self.root_key, data, auth, None)
0027 result = self.client.unseal(self.root_key, blob, auth, None)
0028 self.assertEqual(data, result)
0029
0030 def determine_bank_alg(self, mask):
0031 pcr_banks = self.client.get_cap_pcrs()
0032 for bank_alg, pcrSelection in pcr_banks.items():
0033 if pcrSelection & mask == mask:
0034 return bank_alg
0035 return None
0036
0037 def test_seal_with_policy(self):
0038 bank_alg = self.determine_bank_alg(1 << 16)
0039 self.assertIsNotNone(bank_alg)
0040
0041 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
0042
0043 data = ('X' * 64).encode()
0044 auth = ('A' * 15).encode()
0045 pcrs = [16]
0046
0047 try:
0048 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0049 self.client.policy_password(handle)
0050
0051 policy_dig = self.client.get_policy_digest(handle)
0052 finally:
0053 self.client.flush_context(handle)
0054
0055 blob = self.client.seal(self.root_key, data, auth, policy_dig)
0056
0057 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
0058
0059 try:
0060 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0061 self.client.policy_password(handle)
0062
0063 result = self.client.unseal(self.root_key, blob, auth, handle)
0064 except:
0065 self.client.flush_context(handle)
0066 raise
0067
0068 self.assertEqual(data, result)
0069
0070 def test_unseal_with_wrong_auth(self):
0071 data = ('X' * 64).encode()
0072 auth = ('A' * 20).encode()
0073 rc = 0
0074
0075 blob = self.client.seal(self.root_key, data, auth, None)
0076 try:
0077 result = self.client.unseal(self.root_key, blob,
0078 auth[:-1] + 'B'.encode(), None)
0079 except ProtocolError as e:
0080 rc = e.rc
0081
0082 self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL)
0083
0084 def test_unseal_with_wrong_policy(self):
0085 bank_alg = self.determine_bank_alg(1 << 16 | 1 << 1)
0086 self.assertIsNotNone(bank_alg)
0087
0088 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
0089
0090 data = ('X' * 64).encode()
0091 auth = ('A' * 17).encode()
0092 pcrs = [16]
0093
0094 try:
0095 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0096 self.client.policy_password(handle)
0097
0098 policy_dig = self.client.get_policy_digest(handle)
0099 finally:
0100 self.client.flush_context(handle)
0101
0102 blob = self.client.seal(self.root_key, data, auth, policy_dig)
0103
0104
0105
0106
0107 ds = tpm2.get_digest_size(bank_alg)
0108 self.client.extend_pcr(1, ('X' * ds).encode(), bank_alg=bank_alg)
0109
0110 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
0111
0112 try:
0113 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0114 self.client.policy_password(handle)
0115
0116 result = self.client.unseal(self.root_key, blob, auth, handle)
0117 except:
0118 self.client.flush_context(handle)
0119 raise
0120
0121 self.assertEqual(data, result)
0122
0123
0124
0125 self.client.extend_pcr(16, ('X' * ds).encode(), bank_alg=bank_alg)
0126
0127 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
0128
0129 rc = 0
0130
0131 try:
0132 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0133 self.client.policy_password(handle)
0134
0135 result = self.client.unseal(self.root_key, blob, auth, handle)
0136 except ProtocolError as e:
0137 rc = e.rc
0138 self.client.flush_context(handle)
0139 except:
0140 self.client.flush_context(handle)
0141 raise
0142
0143 self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL)
0144
0145 def test_seal_with_too_long_auth(self):
0146 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
0147 data = ('X' * 64).encode()
0148 auth = ('A' * (ds + 1)).encode()
0149
0150 rc = 0
0151 try:
0152 blob = self.client.seal(self.root_key, data, auth, None)
0153 except ProtocolError as e:
0154 rc = e.rc
0155
0156 self.assertEqual(rc, tpm2.TPM2_RC_SIZE)
0157
0158 def test_too_short_cmd(self):
0159 rejected = False
0160 try:
0161 fmt = '>HIII'
0162 cmd = struct.pack(fmt,
0163 tpm2.TPM2_ST_NO_SESSIONS,
0164 struct.calcsize(fmt) + 1,
0165 tpm2.TPM2_CC_FLUSH_CONTEXT,
0166 0xDEADBEEF)
0167
0168 self.client.send_cmd(cmd)
0169 except IOError as e:
0170 rejected = True
0171 except:
0172 pass
0173 self.assertEqual(rejected, True)
0174
0175 def test_read_partial_resp(self):
0176 try:
0177 fmt = '>HIIH'
0178 cmd = struct.pack(fmt,
0179 tpm2.TPM2_ST_NO_SESSIONS,
0180 struct.calcsize(fmt),
0181 tpm2.TPM2_CC_GET_RANDOM,
0182 0x20)
0183 self.client.tpm.write(cmd)
0184 hdr = self.client.tpm.read(10)
0185 sz = struct.unpack('>I', hdr[2:6])[0]
0186 rsp = self.client.tpm.read()
0187 except:
0188 pass
0189 self.assertEqual(sz, 10 + 2 + 32)
0190 self.assertEqual(len(rsp), 2 + 32)
0191
0192 def test_read_partial_overwrite(self):
0193 try:
0194 fmt = '>HIIH'
0195 cmd = struct.pack(fmt,
0196 tpm2.TPM2_ST_NO_SESSIONS,
0197 struct.calcsize(fmt),
0198 tpm2.TPM2_CC_GET_RANDOM,
0199 0x20)
0200 self.client.tpm.write(cmd)
0201
0202 rsp1 = self.client.tpm.read(15)
0203
0204
0205 self.client.tpm.write(cmd)
0206
0207
0208 rsp2 = self.client.tpm.read()
0209 except:
0210 pass
0211 self.assertEqual(len(rsp1), 15)
0212 self.assertEqual(len(rsp2), 10 + 2 + 32)
0213
0214 def test_send_two_cmds(self):
0215 rejected = False
0216 try:
0217 fmt = '>HIIH'
0218 cmd = struct.pack(fmt,
0219 tpm2.TPM2_ST_NO_SESSIONS,
0220 struct.calcsize(fmt),
0221 tpm2.TPM2_CC_GET_RANDOM,
0222 0x20)
0223 self.client.tpm.write(cmd)
0224
0225
0226 self.client.tpm.write(cmd)
0227 rsp = self.client.tpm.read()
0228
0229 except IOError as e:
0230
0231 rsp = self.client.tpm.read()
0232 rejected = True
0233 pass
0234 except:
0235 pass
0236 self.assertEqual(rejected, True)
0237
0238 class SpaceTest(unittest.TestCase):
0239 def setUp(self):
0240 logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG)
0241
0242 def test_make_two_spaces(self):
0243 log = logging.getLogger(__name__)
0244 log.debug("test_make_two_spaces")
0245
0246 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0247 root1 = space1.create_root_key()
0248 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0249 root2 = space2.create_root_key()
0250 root3 = space2.create_root_key()
0251
0252 log.debug("%08x" % (root1))
0253 log.debug("%08x" % (root2))
0254 log.debug("%08x" % (root3))
0255
0256 def test_flush_context(self):
0257 log = logging.getLogger(__name__)
0258 log.debug("test_flush_context")
0259
0260 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0261 root1 = space1.create_root_key()
0262 log.debug("%08x" % (root1))
0263
0264 space1.flush_context(root1)
0265
0266 def test_get_handles(self):
0267 log = logging.getLogger(__name__)
0268 log.debug("test_get_handles")
0269
0270 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0271 space1.create_root_key()
0272 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0273 space2.create_root_key()
0274 space2.create_root_key()
0275
0276 handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT)
0277
0278 self.assertEqual(len(handles), 2)
0279
0280 log.debug("%08x" % (handles[0]))
0281 log.debug("%08x" % (handles[1]))
0282
0283 def test_invalid_cc(self):
0284 log = logging.getLogger(__name__)
0285 log.debug(sys._getframe().f_code.co_name)
0286
0287 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1
0288
0289 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0290 root1 = space1.create_root_key()
0291 log.debug("%08x" % (root1))
0292
0293 fmt = '>HII'
0294 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt),
0295 TPM2_CC_INVALID)
0296
0297 rc = 0
0298 try:
0299 space1.send_cmd(cmd)
0300 except ProtocolError as e:
0301 rc = e.rc
0302
0303 self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
0304 tpm2.TSS2_RESMGR_TPM_RC_LAYER)
0305
0306 class AsyncTest(unittest.TestCase):
0307 def setUp(self):
0308 logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
0309
0310 def test_async(self):
0311 log = logging.getLogger(__name__)
0312 log.debug(sys._getframe().f_code.co_name)
0313
0314 async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
0315 log.debug("Calling get_cap in a NON_BLOCKING mode")
0316 async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
0317 async_client.close()
0318
0319 def test_flush_invalid_context(self):
0320 log = logging.getLogger(__name__)
0321 log.debug(sys._getframe().f_code.co_name)
0322
0323 async_client = tpm2.Client(tpm2.Client.FLAG_SPACE | tpm2.Client.FLAG_NONBLOCK)
0324 log.debug("Calling flush_context passing in an invalid handle ")
0325 handle = 0x80123456
0326 rc = 0
0327 try:
0328 async_client.flush_context(handle)
0329 except OSError as e:
0330 rc = e.errno
0331
0332 self.assertEqual(rc, 22)
0333 async_client.close()