diff -up ./azure-keyvault/tests/test_certificates.py.orig ./azure-keyvault/tests/test_certificates.py --- ./azure-keyvault/tests/test_certificates.py.orig 2018-08-02 18:37:34.000000000 +0200 +++ ./azure-keyvault/tests/test_certificates.py 2019-04-16 17:16:30.786347635 +0200 @@ -156,78 +156,6 @@ class KeyVaultCertificateTest(KeyvaultTe @ResourceGroupPreparer() @KeyVaultPreparer() - def test_import(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - cert_name = self.get_resource_name('certimp') - - # import certificate( - (cert_bundle, cert_policy) = self._import_common_certificate(vault_uri, cert_name) - self._validate_certificate_bundle(cert_bundle, vault_uri, cert_name, cert_policy) - - @ResourceGroupPreparer() - @KeyVaultPreparer() - def test_list(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - - max_certificates = self.list_test_size - expected = {} - - # import some certificates - for x in range(0, max_certificates): - cert_name = self.get_resource_name('cert{}'.format(x)) - cert_bundle = None - error_count = 0 - while not cert_bundle: - try: - cert_bundle = self._import_common_certificate(vault_uri, cert_name)[0] - cid = KeyVaultId.parse_certificate_id(cert_bundle.id).base_id.strip('/') - expected[cid] = cert_bundle.attributes - except Exception as ex: - if hasattr(ex, 'message') and 'Throttled' in ex.message: - error_count += 1 - time.sleep(2.5 * error_count) - continue - else: - raise ex - - # list certificates - result = list(self.client.get_certificates(vault_uri, self.list_test_size)) - self._validate_certificate_list(result, expected) - - @ResourceGroupPreparer() - @KeyVaultPreparer() - def test_list_versions(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - cert_name = self.get_resource_name('certver') - - max_certificates = self.list_test_size - expected = {} - - # import same certificates as different versions - for x in range(0, max_certificates): - cert_bundle = None - error_count = 0 - while not cert_bundle: - try: - cert_bundle = self._import_common_certificate(vault_uri, cert_name)[0] - cid = KeyVaultId.parse_certificate_id(cert_bundle.id).id.strip('/') - expected[cid] = cert_bundle.attributes - except Exception as ex: - if hasattr(ex, 'message') and 'Throttled' in ex.message: - error_count += 1 - time.sleep(2.5 * error_count) - continue - else: - raise ex - - # list certificate versions - self._validate_certificate_list(list(self.client.get_certificate_versions(vault_uri, cert_name)), expected) - - @ResourceGroupPreparer() - @KeyVaultPreparer() def test_crud_issuer(self, vault, **kwargs): self.assertIsNotNone(vault) vault_uri = vault.properties.vault_uri @@ -397,32 +325,6 @@ class KeyVaultCertificateTest(KeyvaultTe @ResourceGroupPreparer() @KeyVaultPreparer() - def test_policy(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - - cert_name = 'policyCertificate' - - # get certificate policy - (cert_bundle, cert_policy) = self._import_common_certificate(vault_uri, cert_name) - retrieved_policy = self.client.get_certificate_policy(vault_uri, cert_name) - self.assertIsNotNone(retrieved_policy) - - # update certificate policy - cert_policy = CertificatePolicy(key_properties=KeyProperties(exportable=True, - key_type='RSA', - key_size=2048, - reuse_key=False), - secret_properties=SecretProperties(content_type='application/x-pkcs12'), - issuer_parameters=IssuerParameters(name='Self') - ) - - self.client.update_certificate_policy(vault_uri, cert_name, cert_policy) - updated_cert_policy = self.client.get_certificate_policy(vault_uri, cert_name) - self.assertIsNotNone(updated_cert_policy) - - @ResourceGroupPreparer() - @KeyVaultPreparer() def test_manual_enrolled(self, vault, **kwargs): self.assertIsNotNone(vault) vault_uri = vault.properties.vault_uri @@ -451,65 +353,3 @@ class KeyVaultCertificateTest(KeyvaultTe pass finally: self.client.delete_certificate(vault_uri, cert_name) - - @ResourceGroupPreparer() - @KeyVaultPreparer(enable_soft_delete=True) - def test_recover_and_purge(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - - certs = {} - cert_policy = CertificatePolicy(key_properties=KeyProperties(exportable=True, - key_type='RSA', - key_size=2048, - reuse_key=False), - secret_properties=SecretProperties(content_type='application/x-pkcs12'), - issuer_parameters=IssuerParameters(name='Self'), - x509_certificate_properties=X509CertificateProperties( - subject='CN=*.microsoft.com', - subject_alternative_names=SubjectAlternativeNames( - dns_names=['onedrive.microsoft.com', 'xbox.microsoft.com'] - ), - validity_in_months=24 - )) - # create certificates to recover - for i in range(0, self.list_test_size): - cert_name = self.get_resource_name('certrec{}'.format(str(i))) - certs[cert_name] = self._import_common_certificate(vault_uri, cert_name) - - # create certificates to purge - for i in range(0, self.list_test_size): - cert_name = self.get_resource_name('certprg{}'.format(str(i))) - certs[cert_name] = self._import_common_certificate(vault_uri, cert_name) - - # delete all certificates - for cert_name in certs.keys(): - delcert = self.client.delete_certificate(vault_uri, cert_name) - print(delcert) - - if not self.is_playback(): - time.sleep(30) - - # validate all our deleted certificates are returned by get_deleted_certificates - deleted = [KeyVaultId.parse_certificate_id(s.id).name for s in self.client.get_deleted_certificates(vault_uri)] - # self.assertTrue(all(s in deleted for s in certs.keys())) - - # recover select secrets - for certificate_name in [s for s in certs.keys() if s.startswith('certrec')]: - self.client.recover_deleted_certificate(vault_uri, certificate_name) - - # purge select secrets - for certificate_name in [s for s in certs.keys() if s.startswith('certprg')]: - self.client.purge_deleted_certificate(vault_uri, certificate_name) - - if not self.is_playback(): - time.sleep(30) - - # validate none of our deleted certificates are returned by get_deleted_certificates - deleted = [KeyVaultId.parse_secret_id(s.id).name for s in self.client.get_deleted_certificates(vault_uri)] - self.assertTrue(not any(s in deleted for s in certs.keys())) - - # validate the recovered certificates - expected = {k: v for k, v in certs.items() if k.startswith('certrec')} - actual = {k: self.client.get_certificate(vault_uri, k, KeyVaultId.version_none) for k in expected.keys()} - self.assertEqual(len(set(expected.keys()) & set(actual.keys())), len(expected)) diff -up ./azure-keyvault/tests/test_keys.py.orig ./azure-keyvault/tests/test_keys.py --- ./azure-keyvault/tests/test_keys.py.orig 2018-08-02 18:37:34.000000000 +0200 +++ ./azure-keyvault/tests/test_keys.py 2019-04-16 17:17:46.792444105 +0200 @@ -73,54 +73,6 @@ class KeyVaultKeyTest(KeyvaultTestCase): @ResourceGroupPreparer() @KeyVaultPreparer() - def test_key_crud_operations(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - key_name = self.get_resource_name('key') - - # create key - created_bundle = self.client.create_key(vault_uri, key_name, 'RSA') - self._validate_rsa_key_bundle(created_bundle, vault_uri, key_name, 'RSA') - key_id = KeyVaultId.parse_key_id(created_bundle.key.kid) - - # get key without version - self.assertEqual(created_bundle, self.client.get_key(key_id.vault, key_id.name, '')) - - # get key with version - self.assertEqual(created_bundle, self.client.get_key(key_id.vault, key_id.name, key_id.version)) - - def _update_key(key_uri): - updating_bundle = copy.deepcopy(created_bundle) - updating_bundle.attributes.expires = date_parse.parse('2050-02-02T08:00:00.000Z') - updating_bundle.key.key_ops = ['encrypt', 'decrypt'] - updating_bundle.tags = {'foo': 'updated tag'} - kid = KeyVaultId.parse_key_id(key_uri) - key_bundle = self.client.update_key( - kid.vault, kid.name, kid.version, updating_bundle.key.key_ops, updating_bundle.attributes, - updating_bundle.tags) - self.assertEqual(updating_bundle.tags, key_bundle.tags) - self.assertEqual(updating_bundle.key.kid, key_bundle.key.kid) - return key_bundle - - # update key without version - created_bundle = _update_key(key_id.base_id) - - # update key with version - created_bundle = _update_key(key_id.id) - - # delete key - self.client.delete_key(key_id.vault, key_id.name) - - # get key returns not found - try: - self.client.get_key(key_id.vault, key_id.name, '') - self.fail('Get should fail') - except Exception as ex: - if not hasattr(ex, 'message') or 'not found' not in ex.message.lower(): - raise ex - - @ResourceGroupPreparer() - @KeyVaultPreparer() def test_key_list(self, vault, **kwargs): self.assertIsNotNone(vault) vault_uri = vault.properties.vault_uri diff -up ./azure-keyvault/tests/test_secrets.py.orig ./azure-keyvault/tests/test_secrets.py --- ./azure-keyvault/tests/test_secrets.py.orig 2018-08-02 18:37:34.000000000 +0200 +++ ./azure-keyvault/tests/test_secrets.py 2019-04-16 17:31:39.203646018 +0200 @@ -31,56 +31,6 @@ class KeyVaultSecretTest(KeyvaultTestCas @ResourceGroupPreparer() @KeyVaultPreparer() - def test_secret_crud_operations(self, vault, **kwargs): - self.assertIsNotNone(vault) - vault_uri = vault.properties.vault_uri - secret_name = 'crud-secret' - secret_value = self.get_resource_name('crud_secret_value') - - # create secret - secret_bundle = self.client.set_secret(vault_uri, secret_name, secret_value) - self._validate_secret_bundle(secret_bundle, vault_uri, secret_name, secret_value) - created_bundle = secret_bundle - secret_id = KeyVaultId.parse_secret_id(created_bundle.id) - - # get secret without version - self.assertEqual(created_bundle, self.client.get_secret(secret_id.vault, secret_id.name, '')) - - # get secret with version - self.assertEqual(created_bundle, self.client.get_secret(secret_id.vault, secret_id.name, secret_id.version)) - - def _update_secret(secret_uri): - updating_bundle = copy.deepcopy(created_bundle) - updating_bundle.content_type = 'text/plain' - updating_bundle.attributes.expires = date_parse.parse('2050-02-02T08:00:00.000Z') - updating_bundle.tags = {'foo': 'updated tag'} - sid = KeyVaultId.parse_secret_id(secret_uri) - secret_bundle = self.client.update_secret( - sid.vault, sid.name, sid.version, updating_bundle.content_type, updating_bundle.attributes, - updating_bundle.tags) - self.assertEqual(updating_bundle.tags, secret_bundle.tags) - self.assertEqual(updating_bundle.id, secret_bundle.id) - self.assertNotEqual(str(updating_bundle.attributes.updated), str(secret_bundle.attributes.updated)) - return secret_bundle - - # update secret without version - secret_bundle = _update_secret(secret_id.base_id) - - # update secret with version - secret_bundle = _update_secret(secret_id.id) - - # delete secret - self.client.delete_secret(secret_id.vault, secret_id.name) - - # get secret returns not found - try: - self.client.get_secret(secret_id.vault, secret_id.name, '') - except Exception as ex: - if not hasattr(ex, 'message') or 'not found' not in ex.message.lower(): - raise ex - - @ResourceGroupPreparer() - @KeyVaultPreparer() def test_secret_list(self, vault, **kwargs): self.assertIsNotNone(vault) vault_uri = vault.properties.vault_uri