Skip to content

Commit 8b425d7

Browse files
committed
- Added support for Service generated csr on TPP.
- Added example for new feature.
1 parent 71ba2b6 commit 8b425d7

6 files changed

Lines changed: 243 additions & 121 deletions

File tree

examples/get_cert_service_tpp.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright 2019 Venafi, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from __future__ import print_function
19+
20+
from vcert import (CertificateRequest, Connection, RevocationRequest, CSR_ORIGIN_SERVICE)
21+
import string
22+
import random
23+
import logging
24+
import time
25+
from os import environ
26+
27+
logging.basicConfig(level=logging.INFO)
28+
logging.getLogger("urllib3").setLevel(logging.ERROR)
29+
30+
31+
def main():
32+
# Get credentials from environment variables
33+
url = environ.get('TPP_TOKEN_URL')
34+
user = environ.get('TPP_USER')
35+
password = environ.get('TPP_PASSWORD')
36+
zone = environ.get("TPP_ZONE")
37+
server_trust_bundle = environ.get('TPP_TRUST_BUNDLE')
38+
39+
40+
# Connection will be chosen automatically based on which arguments are passed.
41+
# If token is passed Venafi Cloud connection will be used.
42+
# If user, password, and URL Venafi Platform (TPP) will be used.
43+
conn = Connection(url=url, user=user, password=password,
44+
http_request_kwargs={"verify": server_trust_bundle})
45+
# If your TPP server certificate signed with your own CA, or available only via proxy, you can specify
46+
# a trust bundle using requests vars:
47+
# conn = Connection(url=url, token=token, user=user, password=password,
48+
# http_request_kwargs={"verify": "/path-to/bundle.pem"})
49+
50+
request = CertificateRequest(common_name=random_word(10) + ".venafi.example.com")
51+
request.csr_origin = CSR_ORIGIN_SERVICE
52+
request.san_dns = ["www.client.venafi.example.com", "ww1.client.venafi.example.com"]
53+
request.email_addresses = ["e1@venafi.example.com", "e2@venafi.example.com"]
54+
request.ip_addresses = ["127.0.0.1", "192.168.1.1"]
55+
request.uniform_resource_identifiers = ["http://wgtest.com","https://ragnartest.com"]
56+
request.user_principal_names = ["e1@venafi.example.com", "e2@venafi.example.com"]
57+
# Specify ordering certificates in chain. Root can be "first" or "last". By default it last. You also can
58+
# specify "ignore" to ignore chain (supported only for Platform).
59+
# To set Custom Fields for the certificate, specify an array of CustomField objects as name-value pairs
60+
# request.custom_fields = [
61+
# CustomField(name="Cost Center", value="ABC123"),
62+
# CustomField(name="Environment", value="Production"),
63+
# CustomField(name="Environment", value="Staging")
64+
# ]
65+
# Update certificate request from zone
66+
zone_config = conn.read_zone_conf(zone)
67+
request.update_from_zone_config(zone_config)
68+
conn.request_cert(request, zone)
69+
70+
# and wait for signing
71+
cert = conn.retrieve_cert(request)
72+
73+
# after that print cert and key
74+
print(cert.full_chain, request.private_key_pem, sep="\n")
75+
# and save into file
76+
f = open("/tmp/cert.pem", "w")
77+
f.write(cert.full_chain)
78+
f = open("/tmp/cert.key", "w")
79+
f.write(request.private_key_pem)
80+
f.close()
81+
82+
print("Trying to renew certificate")
83+
new_request = CertificateRequest(cert_id=request.id)
84+
conn.renew_cert(new_request)
85+
new_cert = conn.retrieve_cert(new_request)
86+
print(new_cert.cert, new_request.private_key_pem, sep="\n")
87+
fn = open("/tmp/new_cert.pem", "w")
88+
fn.write(new_cert.cert)
89+
fn = open("/tmp/new_cert.key", "w")
90+
fn.write(new_request.private_key_pem)
91+
fn.close()
92+
93+
revocation_req = RevocationRequest(req_id=request.id, comments="Just for test")
94+
print("Revoke", conn.revoke_cert(revocation_req))
95+
96+
print("Trying to sign CSR")
97+
csr_pem = open("example-csr.pem", "rb").read()
98+
csr_request = CertificateRequest(csr=csr_pem.decode())
99+
# zone_config = conn.read_zone_conf(zone)
100+
# request.update_from_zone_config(zone_config)
101+
conn.request_cert(csr_request, zone)
102+
103+
# and wait for signing
104+
while True:
105+
cert = conn.retrieve_cert(csr_request)
106+
if cert:
107+
break
108+
else:
109+
time.sleep(5)
110+
111+
# after that print cert and key
112+
print(cert.full_chain)
113+
# and save into file
114+
f = open("/tmp/signed-cert.pem", "w")
115+
f.write(cert.full_chain)
116+
f.close()
117+
118+
119+
def random_word(length):
120+
letters = string.ascii_lowercase
121+
return ''.join(random.choice(letters) for i in range(length))
122+
123+
124+
if __name__ == '__main__':
125+
main()

vcert/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
#
1616

1717
from .common import CertificateRequest, CommonConnection, RevocationRequest, ZoneConfig, CertField, KeyType, \
18-
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH
18+
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
19+
CSR_ORIGIN_SERVICE
1920
from .connection_cloud import CloudConnection
2021
from .connection_tpp import TPPConnection
2122
from .connection_tpp_token import TPPTokenConnection

vcert/common.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
MIME_ANY = "*/*"
4747
LOCALHOST = "127.0.0.1"
4848
DEFAULT_TIMEOUT = 180
49+
CSR_ORIGIN_PROVIDED = "provided"
50+
CSR_ORIGIN_LOCAL = "local"
51+
CSR_ORIGIN_SERVICE = "service"
4952

5053

5154
class CertField:
@@ -257,7 +260,8 @@ def __init__(self, cert_id=None,
257260
locality=None,
258261
origin=None,
259262
custom_fields=None,
260-
timeout=DEFAULT_TIMEOUT
263+
timeout=DEFAULT_TIMEOUT,
264+
csr_origin=CSR_ORIGIN_LOCAL
261265
):
262266
"""
263267
:param str cert_id: Certificate request id. Generating by server.
@@ -277,6 +281,7 @@ def __init__(self, cert_id=None,
277281
:param str origin: application identifier
278282
:param list[CustomField] custom_fields: list of custom fields values to be added to the certificate.
279283
:param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds.
284+
:param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated.
280285
"""
281286

282287
self.chain_option = "last"
@@ -302,6 +307,10 @@ def __init__(self, cert_id=None,
302307
self.locality = locality
303308
# CSR should be last, because it checks subject to match with over parameters
304309
self.csr = csr
310+
if self.csr:
311+
self.csr_origin = CSR_ORIGIN_PROVIDED
312+
else:
313+
self.csr_origin = csr_origin
305314
self.origin = origin or "Venafi VCert-Python"
306315
self.custom_fields = custom_fields
307316
self.cert_guid = None

vcert/connection_tpp.py

Lines changed: 2 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
from cryptography.hazmat.backends import default_backend
2828
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
2929

30-
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType
30+
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType, CSR_ORIGIN_LOCAL, CSR_ORIGIN_SERVICE, \
31+
CSR_ORIGIN_PROVIDED
3132
from .connection_tpp_abstract import AbstractTPPConnection, URLS
3233
from .errors import (ServerUnexptedBehavior, ClientBadData, CertificateRequestError, AuthenticationError,
3334
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
@@ -114,51 +115,6 @@ def auth(self):
114115
log.error("Authentication status is not %s but %s. Exiting" % (HTTPStatus.OK, status[0]))
115116
raise AuthenticationError
116117

117-
# TODO: Need to add service generated CSR implementation
118-
def request_cert(self, request, zone):
119-
if not request.csr:
120-
request.build_csr()
121-
request_data = {"PolicyDN": self._get_policy_dn(zone),
122-
"PKCS10": request.csr,
123-
"ObjectName": request.friendly_name,
124-
"DisableAutomaticRenewal": "true"}
125-
if request.origin:
126-
request_data["Origin"] = request.origin
127-
ca_origin = {"Name": "Origin", "Value": request.origin}
128-
if request_data.get("CASpecificAttributes"):
129-
request_data["CASpecificAttributes"].append(ca_origin)
130-
else:
131-
request_data["CASpecificAttributes"] = [ca_origin]
132-
133-
if request.custom_fields:
134-
custom_fields_map = {}
135-
for c_field in request.custom_fields:
136-
if custom_fields_map.get(c_field.name):
137-
custom_fields_map[c_field.name].append(c_field.value)
138-
else:
139-
custom_fields_map[c_field.name] = [c_field.value]
140-
141-
for key in custom_fields_map:
142-
custom_field_json = {
143-
"Name": key,
144-
"Values": custom_fields_map[key]
145-
}
146-
if request_data.get("CustomFields"):
147-
request_data["CustomFields"].append(custom_field_json)
148-
else:
149-
request_data["CustomFields"] = [custom_field_json]
150-
151-
status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=request_data)
152-
if status == HTTPStatus.OK:
153-
request.id = data['CertificateDN']
154-
request.cert_guid = data['Guid']
155-
log.debug("Certificate successfully requested with request id %s." % request.id)
156-
log.debug("Certificate successfully requested with GUID %s." % request.cert_guid)
157-
return True
158-
159-
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
160-
raise CertificateRequestError
161-
162118
def retrieve_cert(self, cert_request):
163119
log.debug("Getting certificate status for id %s" % cert_request.id)
164120

@@ -336,19 +292,6 @@ def read_zone_conf(self, tag):
336292
def import_cert(self, request):
337293
raise NotImplementedError
338294

339-
@staticmethod
340-
def _get_policy_dn(zone):
341-
if zone is None:
342-
log.error("Bad zone: %s" % zone)
343-
raise ClientBadData
344-
if re.match(r"^\\\\VED\\\\Policy", zone):
345-
return zone
346-
else:
347-
if re.match(r"^\\\\", zone):
348-
return r"\\VED\\Policy" + zone
349-
else:
350-
return r"\\VED\\Policy\\" + zone
351-
352295
def search_by_thumbprint(self, thumbprint):
353296
"""
354297
:param str thumbprint:

vcert/connection_tpp_abstract.py

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import time
1919
from pprint import pprint
2020

21-
from vcert.common import CertField, CommonConnection
22-
from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError
21+
from vcert.common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
22+
CSR_ORIGIN_SERVICE
23+
from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError, \
24+
CertificateRequestError
2325
from vcert.http import HTTPStatus
2426
from vcert.policy import RPA, SPA
2527
from vcert.policy.pm_tpp import TPPPolicy, is_service_generated_csr, SetAttrResponse, validate_policy_spec, \
@@ -66,6 +68,61 @@ def __init__(self):
6668

6769
class AbstractTPPConnection(CommonConnection):
6870

71+
def request_cert(self, request, zone):
72+
request_data = {'PolicyDN': self._get_policy_dn(zone),
73+
'ObjectName': request.friendly_name,
74+
'DisableAutomaticRenewal': "true"
75+
}
76+
77+
if request.csr_origin == CSR_ORIGIN_LOCAL:
78+
request.build_csr()
79+
80+
if request.csr_origin in [CSR_ORIGIN_PROVIDED, CSR_ORIGIN_LOCAL]:
81+
request_data['PKCS10'] = request.csr,
82+
elif request.csr_origin == CSR_ORIGIN_SERVICE:
83+
request_data['Subject'] = request.common_name
84+
request_data['SubjectAltNames'] = self.wrap_alt_names(request)
85+
else:
86+
log.error("CSR Origin option [%s] is not valid. " % request.csr_origin)
87+
raise ClientBadData
88+
89+
if request.origin:
90+
request_data["Origin"] = request.origin
91+
ca_origin = {"Name": "Origin", "Value": request.origin}
92+
if request_data.get("CASpecificAttributes"):
93+
request_data["CASpecificAttributes"].append(ca_origin)
94+
else:
95+
request_data["CASpecificAttributes"] = [ca_origin]
96+
97+
if request.custom_fields:
98+
custom_fields_map = {}
99+
for c_field in request.custom_fields:
100+
if custom_fields_map.get(c_field.name):
101+
custom_fields_map[c_field.name].append(c_field.value)
102+
else:
103+
custom_fields_map[c_field.name] = [c_field.value]
104+
105+
for key in custom_fields_map:
106+
custom_field_json = {
107+
"Name": key,
108+
"Values": custom_fields_map[key]
109+
}
110+
if request_data.get("CustomFields"):
111+
request_data["CustomFields"].append(custom_field_json)
112+
else:
113+
request_data["CustomFields"] = [custom_field_json]
114+
115+
status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=request_data)
116+
if status == HTTPStatus.OK:
117+
request.id = data['CertificateDN']
118+
request.cert_guid = data['Guid']
119+
log.debug("Certificate successfully requested with request id %s." % request.id)
120+
log.debug("Certificate successfully requested with GUID %s." % request.cert_guid)
121+
return True
122+
123+
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
124+
raise CertificateRequestError
125+
69126
def get_policy(self, zone):
70127
# get policy spec from name
71128
policy_name = self._normalize_zone(zone)
@@ -457,3 +514,48 @@ def _get_policy_parent(zone):
457514
# Return a substring of zone that starts at 0 and ends at index. Zone here is treated as an slice
458515
# zone[0:index] returns the same result
459516
return zone[:index]
517+
518+
@staticmethod
519+
def _get_policy_dn(zone):
520+
if zone is None:
521+
log.error("Bad zone: %s" % zone)
522+
raise ClientBadData
523+
if re.match(r"^\\\\VED\\\\Policy", zone):
524+
return zone
525+
else:
526+
if re.match(r"^\\\\", zone):
527+
return r"\\VED\\Policy" + zone
528+
else:
529+
return r"\\VED\\Policy\\" + zone
530+
531+
def wrap_alt_names(self, request):
532+
"""
533+
534+
:param CertificateRequest request:
535+
:rtype: list[dict]
536+
"""
537+
items = []
538+
for value in request.user_principal_names:
539+
items.append(self._create_san_item(0, value))
540+
for value in request.email_addresses:
541+
items.append(self._create_san_item(1, value))
542+
for value in request.san_dns:
543+
items.append(self._create_san_item(2, value))
544+
for value in request.uniform_resource_identifiers:
545+
items.append(self._create_san_item(6, value))
546+
for value in request.ip_addresses:
547+
items.append(self._create_san_item(7, value))
548+
return items
549+
550+
@staticmethod
551+
def _create_san_item(san_type, value):
552+
"""
553+
554+
:param int san_type:
555+
:param str value:
556+
:return: dict
557+
"""
558+
return {
559+
'Type': san_type,
560+
'Name': value
561+
}

0 commit comments

Comments
 (0)