Skip to content

Commit 5ae4263

Browse files
authored
Merge pull request #71 from Venafi/service_generated_csr
Service generated csr
2 parents c7ff43b + 7ff725e commit 5ae4263

7 files changed

Lines changed: 172 additions & 206 deletions

File tree

examples/get_cert_service_tpp.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
from __future__ import print_function
1919

20-
from vcert import (CertificateRequest, venafi_connection, RevocationRequest, CSR_ORIGIN_SERVICE)
20+
from vcert import (CertificateRequest, venafi_connection, CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST)
2121
import string
2222
import random
2323
import logging
24-
import time
2524
from os import environ
2625

2726
logging.basicConfig(level=logging.INFO)
@@ -39,54 +38,60 @@ def main():
3938
# Connection will be chosen automatically based on which arguments are passed.
4039
# If token is passed Venafi Cloud connection will be used.
4140
# If user, password, and URL Venafi Platform (TPP) will be used.
42-
conn = venafi_connection(url=url, user=user, password=password,
43-
http_request_kwargs={"verify": server_trust_bundle})
4441
# If your TPP server certificate signed with your own CA, or available only via proxy, you can specify
45-
# a trust bundle using requests vars:
46-
# conn = Connection(url=url, token=token, user=user, password=password,
47-
# http_request_kwargs={"verify": "/path-to/bundle.pem"})
42+
# a trust bundle using http_request_kwargs.
43+
conn = venafi_connection(url=url, user=user, password=password, http_request_kwargs={"verify": server_trust_bundle})
4844

45+
# Build a Certificate request
4946
request = CertificateRequest(common_name=random_word(10) + ".venafi.example.com")
47+
# Set the request to use a service generated CSR
5048
request.csr_origin = CSR_ORIGIN_SERVICE
51-
request.san_dns = ["www.client.venafi.example.com", "ww1.client.venafi.example.com"]
52-
request.email_addresses = ["e1@venafi.example.com", "e2@venafi.example.com"]
49+
# Include some Subject Alternative Names
50+
request.san_dns = ["www.dns.venafi.example.com", "ww1.dns.venafi.example.com"]
51+
request.email_addresses = ["email1@venafi.example.com", "email2@venafi.example.com"]
5352
request.ip_addresses = ["127.0.0.1", "192.168.1.1"]
54-
request.uniform_resource_identifiers = ["http://wgtest.com","https://ragnartest.com"]
55-
request.user_principal_names = ["e1@venafi.example.com", "e2@venafi.example.com"]
56-
# Specify ordering certificates in chain. Root can be "first" or "last". By default it last. You also can
57-
# specify "ignore" to ignore chain (supported only for Platform).
53+
request.uniform_resource_identifiers = ["http://wgtest.uri.com","https://ragnartest.uri.com"]
54+
request.user_principal_names = ["upn1@venafi.example.com", "upn2@venafi.example.com"]
55+
# Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first")
56+
# or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST.
57+
# You can also specify CHAIN_OPTION_IGNORE ("ignore") to ignore chain (supported only for TPP).
58+
# request.chain_option = CHAIN_OPTION_FIRST
5859
# To set Custom Fields for the certificate, specify an array of CustomField objects as name-value pairs
5960
# request.custom_fields = [
6061
# CustomField(name="Cost Center", value="ABC123"),
6162
# CustomField(name="Environment", value="Production"),
6263
# CustomField(name="Environment", value="Staging")
6364
# ]
64-
# Update certificate request from zone
65+
#
66+
# Update certificate request from zone.
6567
zone_config = conn.read_zone_conf(zone)
6668
request.update_from_zone_config(zone_config)
69+
# Request the certificate.
6770
conn.request_cert(request, zone)
6871

69-
# and wait for signing
72+
# Wait for the certificate to be retrieved.
73+
# This operation may take some time to return, as it waits until the certificate is ISSUED or it timeout.
74+
# Timeout is 180s by default. Can be changed using:
75+
# request.timeout = 300
7076
cert = conn.retrieve_cert(request)
7177

72-
# after that print cert and key
73-
print(cert.full_chain, request.private_key_pem, sep="\n")
74-
# and save into file
75-
f = open("/tmp/cert.pem", "w")
78+
# Print the certificate
79+
print(cert.full_chain)
80+
# Save it into a file
81+
f = open("./cert.pem", "w")
7682
f.write(cert.full_chain)
77-
f = open("/tmp/cert.key", "w")
78-
f.write(request.private_key_pem)
7983
f.close()
8084

8185
print("Trying to renew certificate")
8286
new_request = CertificateRequest(cert_id=request.id)
87+
# The renewal request should use a service generated CSR as well
88+
# This may not be necessary and depends entirely on the settings of your Policy/Zone
89+
new_request.csr_origin = CSR_ORIGIN_SERVICE
8390
conn.renew_cert(new_request)
8491
new_cert = conn.retrieve_cert(new_request)
85-
print(new_cert.cert, new_request.private_key_pem, sep="\n")
86-
fn = open("/tmp/new_cert.pem", "w")
92+
print(new_cert.cert)
93+
fn = open("./new_cert.pem", "w")
8794
fn.write(new_cert.cert)
88-
fn = open("/tmp/new_cert.key", "w")
89-
fn.write(new_request.private_key_pem)
9095
fn.close()
9196

9297

examples/get_cert_token.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def main():
3333
user = environ.get('TPP_USER')
3434
password = environ.get('TPP_PASSWORD')
3535
url = environ.get('TPP_TOKEN_URL')
36-
zone = environ.get("ZONE")
36+
zone = environ.get("TPP_ZONE")
3737
fake = environ.get('FAKE')
3838

3939
if fake:
@@ -54,7 +54,7 @@ def main():
5454
request.email_addresses = [u"e1@venafi.example.com", u"e2@venafi.example.com"]
5555
request.ip_addresses = [u"127.0.0.1", u"192.168.1.1"]
5656
request.uniform_resource_identifiers = [u"http://wgtest.com",u"https://ragnartest.com"]
57-
request.user_principal_names = [u"e1@venafi.example.com", u"e2@venafi.example.com"]
57+
request.user_principal_names = [u"e1@venafi.example.com", u"e2@venafi.example.com"]
5858
# Specify ordering certificates in chain. Root can be "first" or "last". By default its last. You also can
5959
# specify "ignore" to ignore chain (supported only for Platform).
6060
# To set Custom Fields for the certificate, specify an array of CustomField objects as name-value pairs

vcert/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from .common import CertificateRequest, CommonConnection, RevocationRequest, ZoneConfig, CertField, KeyType, \
1818
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
19-
CSR_ORIGIN_SERVICE
19+
CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, CHAIN_OPTION_LAST
2020
from .connection_cloud import CloudConnection
2121
from .connection_tpp import TPPConnection
2222
from .connection_tpp_token import TPPTokenConnection

vcert/common.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@
4949
CSR_ORIGIN_PROVIDED = "provided"
5050
CSR_ORIGIN_LOCAL = "local"
5151
CSR_ORIGIN_SERVICE = "service"
52+
CHAIN_OPTION_FIRST = "first"
53+
CHAIN_OPTION_LAST = "last"
54+
CHAIN_OPTION_IGNORE = "ignore"
5255

5356

5457
class CertField:
@@ -284,7 +287,7 @@ def __init__(self, cert_id=None,
284287
:param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated.
285288
"""
286289

287-
self.chain_option = "last"
290+
self.chain_option = CHAIN_OPTION_LAST # "last"
288291
self.san_dns = san_dns or []
289292
self.email_addresses = email_addresses
290293
self.ip_addresses = ip_addresses or []

vcert/connection_tpp.py

Lines changed: 1 addition & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -174,64 +174,6 @@ def revoke_cert(self, request):
174174

175175
raise ServerUnexptedBehavior
176176

177-
def renew_cert(self, request, reuse_key=False):
178-
if not request.id and not request.thumbprint:
179-
log.debug("Request id or thumbprint must be specified for TPP")
180-
raise CertificateRenewError
181-
if not request.id and request.thumbprint:
182-
request.id = self.search_by_thumbprint(request.thumbprint)
183-
if reuse_key:
184-
log.debug("Trying to renew certificate %s" % request.id)
185-
status, data = self._post(URLS.CERTIFICATE_RENEW, data={"CertificateDN": request.id})
186-
if not data['Success']:
187-
raise CertificateRenewError
188-
return
189-
cert = self.retrieve_cert(request)
190-
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
191-
for a in cert.subject:
192-
if a.oid == x509.NameOID.COMMON_NAME:
193-
request.common_name = a.value
194-
elif a.oid == x509.NameOID.COUNTRY_NAME:
195-
request.country = a.value
196-
elif a.oid == x509.NameOID.LOCALITY_NAME:
197-
request.locality = a.value
198-
elif a.oid == x509.NameOID.STATE_OR_PROVINCE_NAME:
199-
request.province = a.value
200-
elif a.oid == x509.NameOID.ORGANIZATION_NAME:
201-
request.organization = a.value
202-
elif a.oid == x509.NameOID.ORGANIZATIONAL_UNIT_NAME:
203-
request.organizational_unit = a.value
204-
for e in cert.extensions:
205-
if e.oid == x509.OID_SUBJECT_ALTERNATIVE_NAME:
206-
request.san_dns = list([x.value for x in e.value if isinstance(x, x509.DNSName)])
207-
request.email_addresses = list([x.value for x in e.value if isinstance(x, x509.RFC822Name)])
208-
request.ip_addresses = list([x.value.exploded for x in e.value if isinstance(x, x509.IPAddress)])
209-
# remove header bytes from ASN1 encoded UPN field before setting it in the request object
210-
upns = []
211-
for x in e.value:
212-
if isinstance(x, x509.OtherName):
213-
upns.append(x.value[2::])
214-
request.user_principal_names = upns
215-
request.uniform_resource_identifiers = list(
216-
[x.value for x in e.value if isinstance(x, x509.UniformResourceIdentifier)])
217-
if cert.signature_algorithm_oid in (AlgOID.ECDSA_WITH_SHA1, AlgOID.ECDSA_WITH_SHA224, AlgOID.ECDSA_WITH_SHA256,
218-
AlgOID.ECDSA_WITH_SHA384, AlgOID.ECDSA_WITH_SHA512):
219-
request.key_type = (KeyType.ECDSA, KeyType.ALLOWED_CURVES[0])
220-
else:
221-
request.key_type = KeyType(KeyType.RSA, 2048) # todo: make parsing key size
222-
if not request.csr:
223-
request.build_csr()
224-
status, data = self._post(URLS.CERTIFICATE_RENEW,
225-
data={"CertificateDN": request.id, "PKCS10": request.csr})
226-
if status == HTTPStatus.OK:
227-
if "CertificateDN" in data:
228-
request.id = data['CertificateDN']
229-
log.debug("Certificate successfully requested with request id %s." % request.id)
230-
return True
231-
232-
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
233-
raise CertificateRequestError
234-
235177
@staticmethod
236178
def _parse_zone_config_to_policy(data):
237179
# todo: parse over values to regexps (dont forget tests!)
@@ -284,28 +226,14 @@ def _parse_zone_data_to_object(data):
284226
return z
285227

286228
def read_zone_conf(self, tag):
287-
status, data = self._post(URLS.ZONE_CONFIG, {"PolicyDN": self._get_policy_dn(tag)})
229+
status, data = self._post(URLS.ZONE_CONFIG, {"PolicyDN": self._normalize_zone(tag)})
288230
if status != HTTPStatus.OK:
289231
raise ServerUnexptedBehavior("Server returns %d status on reading zone configuration." % status)
290232
return self._parse_zone_data_to_object(data)
291233

292234
def import_cert(self, request):
293235
raise NotImplementedError
294236

295-
def search_by_thumbprint(self, thumbprint):
296-
"""
297-
:param str thumbprint:
298-
"""
299-
thumbprint = re.sub(r'[^\dabcdefABCDEF]', "", thumbprint)
300-
thumbprint = thumbprint.upper()
301-
status, data = self._get(URLS.CERTIFICATE_SEARCH, params={"Thumbprint": thumbprint})
302-
if status != HTTPStatus.OK:
303-
raise ServerUnexptedBehavior
304-
305-
if not data['Certificates']:
306-
raise ClientBadData("Certificate not found by thumbprint")
307-
return data['Certificates'][0]['DN']
308-
309237
def _read_config_dn(self, dn, attribute_name):
310238
status, data = self._post(URLS.CONFIG_READ_DN, {
311239
"ObjectDN": dn,

0 commit comments

Comments
 (0)