Skip to content

Commit c7ff43b

Browse files
authored
Merge pull request #70 from Venafi/vaas_pm_fix
Service generated CSR implementation
2 parents 4f83f60 + 1d62033 commit c7ff43b

7 files changed

Lines changed: 245 additions & 131 deletions

File tree

examples/get_cert_service_tpp.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright 2019 Venafi, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from __future__ import print_function
19+
20+
from vcert import (CertificateRequest, venafi_connection, RevocationRequest, CSR_ORIGIN_SERVICE)
21+
import string
22+
import random
23+
import logging
24+
import time
25+
from os import environ
26+
27+
logging.basicConfig(level=logging.INFO)
28+
logging.getLogger("urllib3").setLevel(logging.ERROR)
29+
30+
31+
def main():
32+
# Get credentials from environment variables
33+
url = environ.get('TPP_TOKEN_URL')
34+
user = environ.get('TPP_USER')
35+
password = environ.get('TPP_PASSWORD')
36+
zone = environ.get("TPP_ZONE")
37+
server_trust_bundle = environ.get('TPP_TRUST_BUNDLE')
38+
39+
# Connection will be chosen automatically based on which arguments are passed.
40+
# If token is passed Venafi Cloud connection will be used.
41+
# If user, password, and URL Venafi Platform (TPP) will be used.
42+
conn = venafi_connection(url=url, user=user, password=password,
43+
http_request_kwargs={"verify": server_trust_bundle})
44+
# If your TPP server certificate signed with your own CA, or available only via proxy, you can specify
45+
# a trust bundle using requests vars:
46+
# conn = Connection(url=url, token=token, user=user, password=password,
47+
# http_request_kwargs={"verify": "/path-to/bundle.pem"})
48+
49+
request = CertificateRequest(common_name=random_word(10) + ".venafi.example.com")
50+
request.csr_origin = CSR_ORIGIN_SERVICE
51+
request.san_dns = ["www.client.venafi.example.com", "ww1.client.venafi.example.com"]
52+
request.email_addresses = ["e1@venafi.example.com", "e2@venafi.example.com"]
53+
request.ip_addresses = ["127.0.0.1", "192.168.1.1"]
54+
request.uniform_resource_identifiers = ["http://wgtest.com","https://ragnartest.com"]
55+
request.user_principal_names = ["e1@venafi.example.com", "e2@venafi.example.com"]
56+
# Specify ordering certificates in chain. Root can be "first" or "last". By default it last. You also can
57+
# specify "ignore" to ignore chain (supported only for Platform).
58+
# To set Custom Fields for the certificate, specify an array of CustomField objects as name-value pairs
59+
# request.custom_fields = [
60+
# CustomField(name="Cost Center", value="ABC123"),
61+
# CustomField(name="Environment", value="Production"),
62+
# CustomField(name="Environment", value="Staging")
63+
# ]
64+
# Update certificate request from zone
65+
zone_config = conn.read_zone_conf(zone)
66+
request.update_from_zone_config(zone_config)
67+
conn.request_cert(request, zone)
68+
69+
# and wait for signing
70+
cert = conn.retrieve_cert(request)
71+
72+
# after that print cert and key
73+
print(cert.full_chain, request.private_key_pem, sep="\n")
74+
# and save into file
75+
f = open("/tmp/cert.pem", "w")
76+
f.write(cert.full_chain)
77+
f = open("/tmp/cert.key", "w")
78+
f.write(request.private_key_pem)
79+
f.close()
80+
81+
print("Trying to renew certificate")
82+
new_request = CertificateRequest(cert_id=request.id)
83+
conn.renew_cert(new_request)
84+
new_cert = conn.retrieve_cert(new_request)
85+
print(new_cert.cert, new_request.private_key_pem, sep="\n")
86+
fn = open("/tmp/new_cert.pem", "w")
87+
fn.write(new_cert.cert)
88+
fn = open("/tmp/new_cert.key", "w")
89+
fn.write(new_request.private_key_pem)
90+
fn.close()
91+
92+
93+
def random_word(length):
94+
letters = string.ascii_lowercase
95+
return ''.join(random.choice(letters) for i in range(length))
96+
97+
98+
if __name__ == '__main__':
99+
main()

tests/test_e2e.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
RANDOM_DOMAIN, CLOUD_ZONE, \
3737
TPP_ZONE, TPP_ZONE_ECDSA
3838
from vcert import CloudConnection, CertificateRequest, TPPConnection, FakeConnection, ZoneConfig, RevocationRequest, \
39-
TPPTokenConnection, CertField, KeyType, CustomField
39+
TPPTokenConnection, CertField, KeyType, CustomField, CSR_ORIGIN_SERVICE
4040
from vcert.errors import ClientBadData, ServerUnexptedBehavior
4141
from vcert.pem import parse_pem
4242

@@ -287,6 +287,15 @@ def test_tpp_token_enroll(self):
287287
except Exception as err:
288288
self.fail("Error in test: %s" % err.message)
289289

290+
def test_tpp_token_enroll_with_service_generated_csr(self):
291+
cn = random_word(10) + ".venafi.example.com"
292+
try:
293+
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, service_generated_csr=True)
294+
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
295+
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
296+
except Exception as err:
297+
self.fail("Error in test: %s" % err.message)
298+
290299
def test_tpp_token_enroll_with_custom_fields(self):
291300
cn = random_word(10) + ".venafi.example.com"
292301
custom_fields = [
@@ -496,7 +505,8 @@ def enroll_with_zone_update(conn, zone, cn=None):
496505
return cert, request.cert_guid
497506

498507

499-
def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None, csr=None, custom_fields=None):
508+
def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None, csr=None, custom_fields=None,
509+
service_generated_csr=False):
500510
request = CertificateRequest(
501511
common_name=cn,
502512
private_key=private_key,
@@ -515,6 +525,8 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
515525

516526
if csr:
517527
request.csr = csr
528+
elif service_generated_csr:
529+
request.csr_origin = CSR_ORIGIN_SERVICE
518530

519531
conn.request_cert(request, zone)
520532
cert = conn.retrieve_cert(request)
@@ -523,9 +535,10 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
523535
# and save into file
524536
f = open("./cert.pem", "w")
525537
f.write(cert.full_chain)
526-
f = open("./cert.key", "w")
527-
f.write(request.private_key_pem)
528-
f.close()
538+
if not service_generated_csr:
539+
f = open("./cert.key", "w")
540+
f.write(request.private_key_pem)
541+
f.close()
529542

530543
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
531544
assert isinstance(cert, x509.Certificate)
@@ -551,11 +564,15 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
551564
format=serialization.PublicFormat.SubjectPublicKeyInfo
552565
).decode()
553566
else:
554-
source_public_key_pem = request.public_key_pem
567+
source_public_key_pem = request.public_key_pem if not service_generated_csr else None
555568
print(source_public_key_pem)
556569
print(cert_public_key_pem)
557-
assert source_public_key_pem == cert_public_key_pem
558-
return request.id, request.private_key_pem, cert, cert_public_key_pem, request.cert_guid
570+
571+
if not service_generated_csr:
572+
assert source_public_key_pem == cert_public_key_pem
573+
private_key_pem = request.private_key_pem if not service_generated_csr else None
574+
575+
return request.id, private_key_pem, cert, cert_public_key_pem, request.cert_guid
559576

560577

561578
def renew(conn, cert_id, pkey, sn, cn):

vcert/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
#
1616

1717
from .common import CertificateRequest, CommonConnection, RevocationRequest, ZoneConfig, CertField, KeyType, \
18-
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH
18+
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
19+
CSR_ORIGIN_SERVICE
1920
from .connection_cloud import CloudConnection
2021
from .connection_tpp import TPPConnection
2122
from .connection_tpp_token import TPPTokenConnection

vcert/common.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
MIME_ANY = "*/*"
4747
LOCALHOST = "127.0.0.1"
4848
DEFAULT_TIMEOUT = 180
49+
CSR_ORIGIN_PROVIDED = "provided"
50+
CSR_ORIGIN_LOCAL = "local"
51+
CSR_ORIGIN_SERVICE = "service"
4952

5053

5154
class CertField:
@@ -257,7 +260,8 @@ def __init__(self, cert_id=None,
257260
locality=None,
258261
origin=None,
259262
custom_fields=None,
260-
timeout=DEFAULT_TIMEOUT
263+
timeout=DEFAULT_TIMEOUT,
264+
csr_origin=CSR_ORIGIN_LOCAL
261265
):
262266
"""
263267
:param str cert_id: Certificate request id. Generating by server.
@@ -277,6 +281,7 @@ def __init__(self, cert_id=None,
277281
:param str origin: application identifier
278282
:param list[CustomField] custom_fields: list of custom fields values to be added to the certificate.
279283
:param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds.
284+
:param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated.
280285
"""
281286

282287
self.chain_option = "last"
@@ -302,6 +307,10 @@ def __init__(self, cert_id=None,
302307
self.locality = locality
303308
# CSR should be last, because it checks subject to match with over parameters
304309
self.csr = csr
310+
if self.csr:
311+
self.csr_origin = CSR_ORIGIN_PROVIDED
312+
else:
313+
self.csr_origin = csr_origin
305314
self.origin = origin or "Venafi VCert-Python"
306315
self.custom_fields = custom_fields
307316
self.cert_guid = None
@@ -330,6 +339,7 @@ def __setattr__(self, key, value):
330339
else:
331340
raise ClientBadData("invalid private key type %s" % type(value))
332341
elif key == "csr":
342+
self.csr_origin = CSR_ORIGIN_PROVIDED
333343
if isinstance(value, binary_type):
334344
value = value.decode()
335345
elif not (isinstance(value, string_types) or value is None):
@@ -356,7 +366,7 @@ def __setattr__(self, key, value):
356366
self.__dict__[key] = value
357367

358368
def _gen_key(self):
359-
if self.key_type == None:
369+
if self.key_type is None:
360370
self.key_type = KeyType(KeyType.RSA, 2048)
361371
if self.key_type.key_type == KeyType.RSA:
362372
self.private_key = rsa.generate_private_key(
@@ -439,7 +449,7 @@ def build_csr(self):
439449
# string we'll convert it into a bytes object then insert our header. Otherwise, we'll just
440450
# insert the header in the passed in bytes.
441451
if isinstance(upn, str):
442-
bupn = bytes(upn,'utf-8')
452+
bupn = bytes(upn, 'utf-8')
443453
else:
444454
bupn = upn
445455
values = [12,len(upn)]

vcert/connection_tpp.py

Lines changed: 2 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
from cryptography.hazmat.backends import default_backend
2828
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
2929

30-
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType
30+
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType, CSR_ORIGIN_LOCAL, CSR_ORIGIN_SERVICE, \
31+
CSR_ORIGIN_PROVIDED
3132
from .connection_tpp_abstract import AbstractTPPConnection, URLS
3233
from .errors import (ServerUnexptedBehavior, ClientBadData, CertificateRequestError, AuthenticationError,
3334
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
@@ -114,51 +115,6 @@ def auth(self):
114115
log.error("Authentication status is not %s but %s. Exiting" % (HTTPStatus.OK, status[0]))
115116
raise AuthenticationError
116117

117-
# TODO: Need to add service generated CSR implementation
118-
def request_cert(self, request, zone):
119-
if not request.csr:
120-
request.build_csr()
121-
request_data = {"PolicyDN": self._get_policy_dn(zone),
122-
"PKCS10": request.csr,
123-
"ObjectName": request.friendly_name,
124-
"DisableAutomaticRenewal": "true"}
125-
if request.origin:
126-
request_data["Origin"] = request.origin
127-
ca_origin = {"Name": "Origin", "Value": request.origin}
128-
if request_data.get("CASpecificAttributes"):
129-
request_data["CASpecificAttributes"].append(ca_origin)
130-
else:
131-
request_data["CASpecificAttributes"] = [ca_origin]
132-
133-
if request.custom_fields:
134-
custom_fields_map = {}
135-
for c_field in request.custom_fields:
136-
if custom_fields_map.get(c_field.name):
137-
custom_fields_map[c_field.name].append(c_field.value)
138-
else:
139-
custom_fields_map[c_field.name] = [c_field.value]
140-
141-
for key in custom_fields_map:
142-
custom_field_json = {
143-
"Name": key,
144-
"Values": custom_fields_map[key]
145-
}
146-
if request_data.get("CustomFields"):
147-
request_data["CustomFields"].append(custom_field_json)
148-
else:
149-
request_data["CustomFields"] = [custom_field_json]
150-
151-
status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=request_data)
152-
if status == HTTPStatus.OK:
153-
request.id = data['CertificateDN']
154-
request.cert_guid = data['Guid']
155-
log.debug("Certificate successfully requested with request id %s." % request.id)
156-
log.debug("Certificate successfully requested with GUID %s." % request.cert_guid)
157-
return True
158-
159-
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
160-
raise CertificateRequestError
161-
162118
def retrieve_cert(self, cert_request):
163119
log.debug("Getting certificate status for id %s" % cert_request.id)
164120

@@ -336,19 +292,6 @@ def read_zone_conf(self, tag):
336292
def import_cert(self, request):
337293
raise NotImplementedError
338294

339-
@staticmethod
340-
def _get_policy_dn(zone):
341-
if zone is None:
342-
log.error("Bad zone: %s" % zone)
343-
raise ClientBadData
344-
if re.match(r"^\\\\VED\\\\Policy", zone):
345-
return zone
346-
else:
347-
if re.match(r"^\\\\", zone):
348-
return r"\\VED\\Policy" + zone
349-
else:
350-
return r"\\VED\\Policy\\" + zone
351-
352295
def search_by_thumbprint(self, thumbprint):
353296
"""
354297
:param str thumbprint:

0 commit comments

Comments
 (0)