Back to home page

OSCL-LXR

 
 

    


0001 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
0002 
0003 from argparse import ArgumentParser
0004 from argparse import FileType
0005 import os
0006 import sys
0007 import tpm2
0008 from tpm2 import ProtocolError
0009 import unittest
0010 import logging
0011 import struct
0012 
0013 class SmokeTest(unittest.TestCase):
0014     def setUp(self):
0015         self.client = tpm2.Client()
0016         self.root_key = self.client.create_root_key()
0017 
0018     def tearDown(self):
0019         self.client.flush_context(self.root_key)
0020         self.client.close()
0021 
0022     def test_seal_with_auth(self):
0023         data = ('X' * 64).encode()
0024         auth = ('A' * 15).encode()
0025 
0026         blob = self.client.seal(self.root_key, data, auth, None)
0027         result = self.client.unseal(self.root_key, blob, auth, None)
0028         self.assertEqual(data, result)
0029 
0030     def determine_bank_alg(self, mask):
0031         pcr_banks = self.client.get_cap_pcrs()
0032         for bank_alg, pcrSelection in pcr_banks.items():
0033             if pcrSelection & mask == mask:
0034                 return bank_alg
0035         return None
0036 
0037     def test_seal_with_policy(self):
0038         bank_alg = self.determine_bank_alg(1 << 16)
0039         self.assertIsNotNone(bank_alg)
0040 
0041         handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
0042 
0043         data = ('X' * 64).encode()
0044         auth = ('A' * 15).encode()
0045         pcrs = [16]
0046 
0047         try:
0048             self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0049             self.client.policy_password(handle)
0050 
0051             policy_dig = self.client.get_policy_digest(handle)
0052         finally:
0053             self.client.flush_context(handle)
0054 
0055         blob = self.client.seal(self.root_key, data, auth, policy_dig)
0056 
0057         handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
0058 
0059         try:
0060             self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0061             self.client.policy_password(handle)
0062 
0063             result = self.client.unseal(self.root_key, blob, auth, handle)
0064         except:
0065             self.client.flush_context(handle)
0066             raise
0067 
0068         self.assertEqual(data, result)
0069 
0070     def test_unseal_with_wrong_auth(self):
0071         data = ('X' * 64).encode()
0072         auth = ('A' * 20).encode()
0073         rc = 0
0074 
0075         blob = self.client.seal(self.root_key, data, auth, None)
0076         try:
0077             result = self.client.unseal(self.root_key, blob,
0078                         auth[:-1] + 'B'.encode(), None)
0079         except ProtocolError as e:
0080             rc = e.rc
0081 
0082         self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL)
0083 
0084     def test_unseal_with_wrong_policy(self):
0085         bank_alg = self.determine_bank_alg(1 << 16 | 1 << 1)
0086         self.assertIsNotNone(bank_alg)
0087 
0088         handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
0089 
0090         data = ('X' * 64).encode()
0091         auth = ('A' * 17).encode()
0092         pcrs = [16]
0093 
0094         try:
0095             self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0096             self.client.policy_password(handle)
0097 
0098             policy_dig = self.client.get_policy_digest(handle)
0099         finally:
0100             self.client.flush_context(handle)
0101 
0102         blob = self.client.seal(self.root_key, data, auth, policy_dig)
0103 
0104         # Extend first a PCR that is not part of the policy and try to unseal.
0105         # This should succeed.
0106 
0107         ds = tpm2.get_digest_size(bank_alg)
0108         self.client.extend_pcr(1, ('X' * ds).encode(), bank_alg=bank_alg)
0109 
0110         handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
0111 
0112         try:
0113             self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0114             self.client.policy_password(handle)
0115 
0116             result = self.client.unseal(self.root_key, blob, auth, handle)
0117         except:
0118             self.client.flush_context(handle)
0119             raise
0120 
0121         self.assertEqual(data, result)
0122 
0123         # Then, extend a PCR that is part of the policy and try to unseal.
0124         # This should fail.
0125         self.client.extend_pcr(16, ('X' * ds).encode(), bank_alg=bank_alg)
0126 
0127         handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
0128 
0129         rc = 0
0130 
0131         try:
0132             self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
0133             self.client.policy_password(handle)
0134 
0135             result = self.client.unseal(self.root_key, blob, auth, handle)
0136         except ProtocolError as e:
0137             rc = e.rc
0138             self.client.flush_context(handle)
0139         except:
0140             self.client.flush_context(handle)
0141             raise
0142 
0143         self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL)
0144 
0145     def test_seal_with_too_long_auth(self):
0146         ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
0147         data = ('X' * 64).encode()
0148         auth = ('A' * (ds + 1)).encode()
0149 
0150         rc = 0
0151         try:
0152             blob = self.client.seal(self.root_key, data, auth, None)
0153         except ProtocolError as e:
0154             rc = e.rc
0155 
0156         self.assertEqual(rc, tpm2.TPM2_RC_SIZE)
0157 
0158     def test_too_short_cmd(self):
0159         rejected = False
0160         try:
0161             fmt = '>HIII'
0162             cmd = struct.pack(fmt,
0163                               tpm2.TPM2_ST_NO_SESSIONS,
0164                               struct.calcsize(fmt) + 1,
0165                               tpm2.TPM2_CC_FLUSH_CONTEXT,
0166                               0xDEADBEEF)
0167 
0168             self.client.send_cmd(cmd)
0169         except IOError as e:
0170             rejected = True
0171         except:
0172             pass
0173         self.assertEqual(rejected, True)
0174 
0175     def test_read_partial_resp(self):
0176         try:
0177             fmt = '>HIIH'
0178             cmd = struct.pack(fmt,
0179                               tpm2.TPM2_ST_NO_SESSIONS,
0180                               struct.calcsize(fmt),
0181                               tpm2.TPM2_CC_GET_RANDOM,
0182                               0x20)
0183             self.client.tpm.write(cmd)
0184             hdr = self.client.tpm.read(10)
0185             sz = struct.unpack('>I', hdr[2:6])[0]
0186             rsp = self.client.tpm.read()
0187         except:
0188             pass
0189         self.assertEqual(sz, 10 + 2 + 32)
0190         self.assertEqual(len(rsp), 2 + 32)
0191 
0192     def test_read_partial_overwrite(self):
0193         try:
0194             fmt = '>HIIH'
0195             cmd = struct.pack(fmt,
0196                               tpm2.TPM2_ST_NO_SESSIONS,
0197                               struct.calcsize(fmt),
0198                               tpm2.TPM2_CC_GET_RANDOM,
0199                               0x20)
0200             self.client.tpm.write(cmd)
0201             # Read part of the respone
0202             rsp1 = self.client.tpm.read(15)
0203 
0204             # Send a new cmd
0205             self.client.tpm.write(cmd)
0206 
0207             # Read the whole respone
0208             rsp2 = self.client.tpm.read()
0209         except:
0210             pass
0211         self.assertEqual(len(rsp1), 15)
0212         self.assertEqual(len(rsp2), 10 + 2 + 32)
0213 
0214     def test_send_two_cmds(self):
0215         rejected = False
0216         try:
0217             fmt = '>HIIH'
0218             cmd = struct.pack(fmt,
0219                               tpm2.TPM2_ST_NO_SESSIONS,
0220                               struct.calcsize(fmt),
0221                               tpm2.TPM2_CC_GET_RANDOM,
0222                               0x20)
0223             self.client.tpm.write(cmd)
0224 
0225             # expect the second one to raise -EBUSY error
0226             self.client.tpm.write(cmd)
0227             rsp = self.client.tpm.read()
0228 
0229         except IOError as e:
0230             # read the response
0231             rsp = self.client.tpm.read()
0232             rejected = True
0233             pass
0234         except:
0235             pass
0236         self.assertEqual(rejected, True)
0237 
0238 class SpaceTest(unittest.TestCase):
0239     def setUp(self):
0240         logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG)
0241 
0242     def test_make_two_spaces(self):
0243         log = logging.getLogger(__name__)
0244         log.debug("test_make_two_spaces")
0245 
0246         space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0247         root1 = space1.create_root_key()
0248         space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0249         root2 = space2.create_root_key()
0250         root3 = space2.create_root_key()
0251 
0252         log.debug("%08x" % (root1))
0253         log.debug("%08x" % (root2))
0254         log.debug("%08x" % (root3))
0255 
0256     def test_flush_context(self):
0257         log = logging.getLogger(__name__)
0258         log.debug("test_flush_context")
0259 
0260         space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0261         root1 = space1.create_root_key()
0262         log.debug("%08x" % (root1))
0263 
0264         space1.flush_context(root1)
0265 
0266     def test_get_handles(self):
0267         log = logging.getLogger(__name__)
0268         log.debug("test_get_handles")
0269 
0270         space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0271         space1.create_root_key()
0272         space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0273         space2.create_root_key()
0274         space2.create_root_key()
0275 
0276         handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT)
0277 
0278         self.assertEqual(len(handles), 2)
0279 
0280         log.debug("%08x" % (handles[0]))
0281         log.debug("%08x" % (handles[1]))
0282 
0283     def test_invalid_cc(self):
0284         log = logging.getLogger(__name__)
0285         log.debug(sys._getframe().f_code.co_name)
0286 
0287         TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1
0288 
0289         space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
0290         root1 = space1.create_root_key()
0291         log.debug("%08x" % (root1))
0292 
0293         fmt = '>HII'
0294         cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt),
0295                           TPM2_CC_INVALID)
0296 
0297         rc = 0
0298         try:
0299             space1.send_cmd(cmd)
0300         except ProtocolError as e:
0301             rc = e.rc
0302 
0303         self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
0304                          tpm2.TSS2_RESMGR_TPM_RC_LAYER)
0305 
0306 class AsyncTest(unittest.TestCase):
0307     def setUp(self):
0308         logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
0309 
0310     def test_async(self):
0311         log = logging.getLogger(__name__)
0312         log.debug(sys._getframe().f_code.co_name)
0313 
0314         async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
0315         log.debug("Calling get_cap in a NON_BLOCKING mode")
0316         async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
0317         async_client.close()
0318 
0319     def test_flush_invalid_context(self):
0320         log = logging.getLogger(__name__)
0321         log.debug(sys._getframe().f_code.co_name)
0322 
0323         async_client = tpm2.Client(tpm2.Client.FLAG_SPACE | tpm2.Client.FLAG_NONBLOCK)
0324         log.debug("Calling flush_context passing in an invalid handle ")
0325         handle = 0x80123456
0326         rc = 0
0327         try:
0328             async_client.flush_context(handle)
0329         except OSError as e:
0330             rc = e.errno
0331 
0332         self.assertEqual(rc, 22)
0333         async_client.close()