Skip to content

Commit 2eb20df

Browse files
authored
Merge pull request #76 from Venafi/flexible_validity_period
- Added support for flexible validity periods.
2 parents bd6d6cc + 14e1f68 commit 2eb20df

15 files changed

Lines changed: 801 additions & 696 deletions

tests/test_e2e.py

Lines changed: 6 additions & 448 deletions
Large diffs are not rendered by default.

tests/test_env.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17-
import random
18-
import string
1917
from os import environ
2018

21-
from future.backports.datetime import datetime
2219
from six import text_type
2320

2421
FAKE = environ.get("FAKE")
@@ -43,12 +40,3 @@
4340

4441
if RANDOM_DOMAIN and not isinstance(RANDOM_DOMAIN, text_type):
4542
RANDOM_DOMAIN = RANDOM_DOMAIN.decode()
46-
47-
48-
def random_word(length):
49-
letters = string.ascii_lowercase
50-
return ''.join(random.choice(letters) for _ in range(length))
51-
52-
53-
def timestamp():
54-
return datetime.today().strftime('%Y.%m.%d-%Hh%Mm%Ss')

tests/test_local_methods.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@
161161

162162
chain = [ca_cert, ca_cert]
163163

164-
log = logger.get_child("test-local-methods")
164+
log = logger.get_child("test-local")
165165

166166

167167
class TestLocalMethods(unittest.TestCase):
@@ -347,4 +347,3 @@ def test_pkcs12_plain_pk(self):
347347
cert = Certificate(cert=pkcs12_plain_cert, chain=chain, key=pkcs12_plain_pk)
348348
output = cert.as_pkcs12()
349349
log.info("PKCS12 created successfully:\n%s" % output)
350-
pass

tests/test_pm.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
# limitations under the License.
1616
#
1717

18-
import logging
1918
import os
2019
import unittest
2120
from pprint import pformat
2221

2322
from test_env import TPP_TOKEN_URL, CLOUD_APIKEY, CLOUD_URL, TPP_PM_ROOT, CLOUD_ENTRUST_CA_NAME, \
24-
CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD, timestamp
25-
from vcert import TPPTokenConnection, CloudConnection, Authentication, SCOPE_PM
23+
CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD
24+
from test_utils import timestamp
25+
from vcert import TPPTokenConnection, CloudConnection, Authentication, SCOPE_PM, logger
2626
from vcert.parser import json_parser, yaml_parser
2727
from vcert.parser.utils import parse_policy_spec
2828
from vcert.policy import Policy, Subject, KeyPair, SubjectAltNames, Defaults, DefaultSubject, DefaultKeyPair, \
@@ -33,8 +33,7 @@
3333
POLICY_SPEC_YAML = '/resources/policy_specification.yaml'
3434
CA_TYPE_TPP = 'TPP'
3535

36-
logging.basicConfig(level=logging.DEBUG)
37-
logger = logging.getLogger('vcert-test')
36+
log = logger.get_child("test-pm")
3837

3938

4039
class TestParsers(unittest.TestCase):
@@ -167,8 +166,8 @@ def create_policy(connector, zone, policy_spec=None, policy=None, defaults=None)
167166
connector.set_policy(zone, policy_spec)
168167
resp = connector.get_policy(zone)
169168
data = parse_policy_spec(resp)
170-
logger.debug('Created Policy at %s' % zone)
171-
logger.debug(pformat(data))
169+
log.debug('Created Policy at %s' % zone)
170+
log.debug(pformat(data))
172171
return resp
173172

174173

@@ -251,7 +250,7 @@ def _get_tpp_policy_name():
251250

252251
def _resolve_resources_path(path):
253252
resources_dir = os.path.dirname(__file__)
254-
logger.debug('Testing root folder: [%s]' % resources_dir)
253+
log.debug('Testing root folder: [%s]' % resources_dir)
255254
resolved_path = ('.%s' % path) if resources_dir.endswith('tests') else ('./tests%s' % path)
256-
logger.debug('resolved path: [%s]' % resolved_path)
255+
log.debug('resolved path: [%s]' % resolved_path)
257256
return resolved_path

tests/test_ssh.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020
import unittest
2121

2222
from assets import SSH_CERT_DATA, SSH_PRIVATE_KEY, SSH_PUBLIC_KEY
23-
from test_env import timestamp, TPP_TOKEN_URL, TPP_USER, TPP_PASSWORD, TPP_SSH_CADN
23+
from test_env import TPP_TOKEN_URL, TPP_USER, TPP_PASSWORD, TPP_SSH_CADN
24+
from test_utils import timestamp
2425
from vcert import CommonConnection, SSHCertRequest, TPPTokenConnection, Authentication, \
25-
SCOPE_SSH, write_ssh_files
26+
SCOPE_SSH, write_ssh_files, logger
2627
from vcert.ssh_utils import SSHRetrieveResponse, SSHKeyPair
2728

28-
logging.basicConfig(level=logging.DEBUG)
29-
logger = logging.getLogger('vcert-test')
30-
29+
log = logger.get_child("test-ssh")
3130

3231
SERVICE_GENERATED_NO_KEY_ERROR = "%s key data is %s empty for Certificate %s" # type: str
3332
SSH_CERT_DATA_ERROR = "Certificate data is empty for Certificate %s" # type: str
@@ -84,7 +83,7 @@ def test_write_ssh_files(self):
8483
with open(full_path, "r") as priv_key_file:
8584
s_priv_key = priv_key_file.read()
8685
expected_priv_key = SSH_PRIVATE_KEY
87-
if platform.system() is not "Windows":
86+
if platform.system() != "Windows":
8887
expected_priv_key = expected_priv_key.replace("\r\n", "\n")
8988

9089
self.assertTrue(expected_priv_key == s_priv_key, err_msg % "SSH Private Key")

tests/test_tpp_token.py

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright 2021 Venafi, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import binascii
18+
import time
19+
import unittest
20+
from datetime import datetime, timedelta
21+
22+
from cryptography import x509
23+
from cryptography.hazmat.backends import default_backend
24+
from cryptography.hazmat.primitives import hashes
25+
26+
from assets import TEST_KEY_ECDSA, TEST_KEY_RSA_4096, TEST_KEY_RSA_2048_ENCRYPTED
27+
from test_env import TPP_ZONE, TPP_ZONE_ECDSA, TPP_USER, TPP_PASSWORD, TPP_TOKEN_URL
28+
from test_utils import random_word, enroll, renew, renew_by_thumbprint, renew_without_key_reuse, \
29+
enroll_with_zone_update, simple_enroll
30+
from vcert import CustomField, KeyType, RevocationRequest, CertificateRequest, IssuerHint, logger, \
31+
TPPTokenConnection
32+
from vcert.errors import ClientBadData, ServerUnexptedBehavior
33+
34+
log = logger.get_child("test-tpp-token")
35+
36+
37+
class TestTPPTokenMethods(unittest.TestCase):
38+
def __init__(self, *args, **kwargs):
39+
self.tpp_zone = TPP_ZONE
40+
self.tpp_zone_ecdsa = TPP_ZONE_ECDSA
41+
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, user=TPP_USER, password=TPP_PASSWORD,
42+
http_request_kwargs={"verify": "/tmp/chain.pem"})
43+
super(TestTPPTokenMethods, self).__init__(*args, **kwargs)
44+
45+
def test_tpp_token_enroll(self):
46+
cn = random_word(10) + ".venafi.example.com"
47+
try:
48+
cert_id, pkey, cert, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn)
49+
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
50+
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
51+
except Exception as err:
52+
self.fail("Error in test: %s" % err.message)
53+
54+
def test_tpp_token_enroll_with_service_generated_csr(self):
55+
cn = random_word(10) + ".venafi.example.com"
56+
try:
57+
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="FooBarPass123",
58+
service_generated_csr=True)
59+
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
60+
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
61+
except Exception as err:
62+
self.fail("Error in test: %s" % err.message)
63+
64+
def test_tpp_token_enroll_with_custom_fields(self):
65+
cn = random_word(10) + ".venafi.example.com"
66+
custom_fields = [
67+
CustomField(name="custom", value="pythonTest"),
68+
CustomField(name="cfList", value="item2"),
69+
CustomField(name="cfListMulti", value="tier1"),
70+
CustomField(name="cfListMulti", value="tier4")
71+
]
72+
try:
73+
cert_id, pkey, cert, _, cert_guid = enroll(conn=self.tpp_conn, zone=self.tpp_zone, cn=cn,
74+
custom_fields=custom_fields)
75+
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
76+
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
77+
except Exception as err:
78+
self.fail("Error in test: %s" % err.__str__)
79+
80+
def test_tpp_token_enroll_origin(self):
81+
cn = random_word(10) + ".venafi.example.com"
82+
try:
83+
cert_id, pkey, cert, _, _ = enroll(self.tpp_conn, self.tpp_zone, cn)
84+
except Exception as err:
85+
self.fail("Error in test: %s" % err.__str__())
86+
87+
def test_tpp_token_renew(self):
88+
cn = random_word(10) + ".venafi.example.com"
89+
cert_id, pkey, cert, _, _ = enroll(self.tpp_conn, self.tpp_zone, cn)
90+
cert = renew(self.tpp_conn, cert_id, pkey, cert.serial_number, cn)
91+
92+
def test_tpp_token_renew_twice(self):
93+
cn = random_word(10) + ".venafi.example.com"
94+
cert_id, pkey, cert, _, _ = enroll(self.tpp_conn, self.tpp_zone, cn)
95+
time.sleep(5)
96+
renew(self.tpp_conn, cert_id, pkey, cert.serial_number, cn)
97+
time.sleep(5)
98+
renew(self.tpp_conn, cert_id, pkey, cert.serial_number, cn)
99+
100+
def test_tpp_token_renew_by_thumbprint(self):
101+
cn = random_word(10) + ".venafi.example.com"
102+
cert_id, pkey, cert, _, _ = enroll(self.tpp_conn, self.tpp_zone, cn)
103+
renew_by_thumbprint(self.tpp_conn, cert)
104+
105+
def test_tpp_token_renew_without_key_reuse(self):
106+
renew_without_key_reuse(self, self.tpp_conn, self.tpp_zone)
107+
108+
def test_tpp_token_enroll_ecdsa(self):
109+
cn = random_word(10) + ".venafi.example.com"
110+
enroll(self.tpp_conn, self.tpp_zone_ecdsa, cn, TEST_KEY_ECDSA[0], TEST_KEY_ECDSA[1])
111+
112+
def test_tpp_token_enroll_with_custom_key(self):
113+
cn = random_word(10) + ".venafi.example.com"
114+
enroll(self.tpp_conn, self.tpp_zone, cn, TEST_KEY_RSA_4096[0], TEST_KEY_RSA_4096[1])
115+
116+
def test_tpp_token_enroll_with_encrypted_key(self):
117+
cn = random_word(10) + ".venafi.example.com"
118+
enroll(self.tpp_conn, self.tpp_zone, cn, TEST_KEY_RSA_2048_ENCRYPTED[0], TEST_KEY_RSA_2048_ENCRYPTED[1],
119+
'venafi')
120+
121+
def test_tpp_token_enroll_with_custom_csr(self):
122+
key = open("/tmp/csr-test.key.pem").read()
123+
csr = open("/tmp/csr-test.csr.csr").read()
124+
enroll(self.tpp_conn, self.tpp_zone, private_key=key, csr=csr)
125+
126+
def test_tpp_token_enroll_with_zone_update_and_custom_origin(self):
127+
cn = random_word(10) + ".venafi.example.com"
128+
cert, cert_guid = enroll_with_zone_update(self.tpp_conn, self.tpp_zone_ecdsa, cn)
129+
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
130+
self.assertEqual(cert_config["Origin"], "Python-SDK ECDSA")
131+
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
132+
key = cert.public_key()
133+
self.assertEqual(key.curve.name, "secp521r1")
134+
135+
def test_tpp_token_read_zone_config(self):
136+
zone = self.tpp_conn.read_zone_conf(self.tpp_zone)
137+
self.assertEqual(zone.country.value, "US")
138+
self.assertEqual(zone.province.value, "Utah")
139+
self.assertEqual(zone.locality.value, "Salt Lake")
140+
self.assertEqual(zone.organization.value, "Venafi Inc.")
141+
self.assertEqual(zone.organizational_unit.value, ["Integrations"])
142+
self.assertEqual(zone.key_type.key_type, KeyType.RSA)
143+
self.assertEqual(zone.key_type.option, 2048)
144+
145+
def test_tpp_token_read_zone_unknown_zone(self):
146+
with self.assertRaises(Exception):
147+
self.tpp_conn.read_zone_conf("fdsfsd")
148+
149+
def test_tpp_token_retrieve_non_issued(self):
150+
with self.assertRaises(Exception):
151+
self.tpp_conn.retrieve_cert(self.tpp_zone + "\\devops\\vcert\\test-non-issued.example.com")
152+
153+
def test_tpp_token_search_by_thumbprint(self):
154+
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
155+
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
156+
fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
157+
found = self.tpp_conn.search_by_thumbprint(fingerprint)
158+
self.assertEqual(found, req.id)
159+
160+
def test_token_revoke_not_issued(self):
161+
req = RevocationRequest(req_id=self.tpp_zone + '\\not-issued.example.com')
162+
with self.assertRaises(Exception):
163+
self.tpp_conn.revoke_cert(req)
164+
req = RevocationRequest(thumbprint="2b25ff9f8725dfee37c6a7adcba31897b12e921d")
165+
with self.assertRaises(Exception):
166+
self.tpp_conn.revoke_cert(req)
167+
req = RevocationRequest()
168+
with self.assertRaises(Exception):
169+
self.tpp_conn.revoke_cert(req)
170+
171+
def test_token_revoke_normal(self):
172+
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
173+
rev_req = RevocationRequest(req_id=req.id)
174+
self.tpp_conn.revoke_cert(rev_req)
175+
time.sleep(1)
176+
with self.assertRaises(Exception):
177+
self.tpp_conn.renew_cert(req)
178+
179+
def test_token_revoke_without_disable(self):
180+
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
181+
rev_req = RevocationRequest(req_id=req.id, disable=False)
182+
self.tpp_conn.revoke_cert(rev_req)
183+
time.sleep(1)
184+
self.tpp_conn.renew_cert(req)
185+
186+
def test_token_revoke_normal_thumbprint(self):
187+
req, cert = simple_enroll(self.tpp_conn, self.tpp_zone)
188+
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
189+
thumbprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
190+
rev_req = RevocationRequest(thumbprint=thumbprint)
191+
self.tpp_conn.revoke_cert(rev_req)
192+
time.sleep(1)
193+
with self.assertRaises(Exception):
194+
self.tpp_conn.renew_cert(req)
195+
196+
def test_tpp_token_enroll_valid_hours(self):
197+
cn = random_word(10) + ".venafi.example.com"
198+
request = CertificateRequest(common_name=cn)
199+
200+
request.san_dns = [u"www.client.venafi.example.com", u"ww1.client.venafi.example.com"]
201+
request.email_addresses = [u"e1@venafi.example.com", u"e2@venafi.example.com"]
202+
request.ip_addresses = [u"127.0.0.1", u"192.168.1.1"]
203+
request.user_principal_names = [u"e1@venafi.example.com", u"e2@venafi.example.com"]
204+
request.uniform_resource_identifiers = [u"https://www.venafi.com", u"https://venafi.cloud"]
205+
206+
custom_fields = [
207+
CustomField(name="custom", value="pythonTest"),
208+
CustomField(name="cfList", value="item2"),
209+
CustomField(name="cfListMulti", value="tier1"),
210+
CustomField(name="cfListMulti", value="tier4")
211+
]
212+
213+
request.custom_fields = custom_fields
214+
request.validity_hours = 144
215+
request.issuer_hint = IssuerHint.MICROSOFT
216+
expected_date = datetime.utcnow() + timedelta(hours=request.validity_hours)
217+
218+
self.tpp_conn.request_cert(request, self.tpp_zone)
219+
cert = self.tpp_conn.retrieve_cert(request)
220+
221+
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
222+
assert isinstance(cert, x509.Certificate)
223+
expiration_date = cert.not_valid_after
224+
# Due to some roundings and delays in operations on the server side, the certificate expiration date
225+
# is not exactly the same as the one used in the request. A gap is allowed in this scenario to compensate
226+
# this delays and roundings.
227+
delta = timedelta(seconds=60)
228+
date_format = "%Y-%m-%d %H:%M:%S"
229+
self.assertAlmostEqual(expected_date, expiration_date, delta=delta,
230+
msg="Delta between expected and expiration date is too big.\nExpected: %s\nGot: %s\n"
231+
"Expected_delta: %s seconds."
232+
% (expected_date.strftime(date_format), expiration_date.strftime(date_format),
233+
delta.total_seconds()))
234+
235+
def test_get_access_token(self):
236+
try:
237+
token_info = self.tpp_conn.get_access_token()
238+
self.assertIsNotNone(token_info)
239+
self.assertIsNotNone(token_info.access_token)
240+
self.assertIsNotNone(token_info.refresh_token)
241+
self.assertIsNotNone(token_info.expires)
242+
except ClientBadData:
243+
self.fail("Error in Test Data")
244+
except ServerUnexptedBehavior as sub:
245+
self.fail("Error from server: %s" % sub.__str__())
246+
247+
def test_refresh_access_token(self):
248+
try:
249+
self.tpp_conn.get_access_token()
250+
refresh_info = self.tpp_conn.refresh_access_token()
251+
self.assertIsNotNone(refresh_info)
252+
self.assertIsNotNone(refresh_info.access_token)
253+
self.assertIsNotNone(refresh_info.refresh_token)
254+
self.assertIsNotNone(refresh_info.expires)
255+
except ClientBadData:
256+
self.fail("Error in Test Data")
257+
except ServerUnexptedBehavior as sub:
258+
self.fail("Error from server: %s" % sub.__str__())
259+
260+
def test_revoke_access_token(self):
261+
try:
262+
self.tpp_conn.get_access_token()
263+
status, resp = self.tpp_conn.revoke_access_token()
264+
self.assertEqual(status, 200)
265+
except Exception as err:
266+
self.fail("Error happened: %s" % err.__str__())
267+
268+
cn = random_word(10) + ".venafi.example.com"
269+
with self.assertRaises(Exception):
270+
enroll(self.tpp_conn, self.tpp_zone, cn)

0 commit comments

Comments
 (0)