Skip to content

Commit 12ecdcf

Browse files
authored
Merge pull request #73 from Venafi/service_generated_csr
Service generated CSR
2 parents f46d60f + eda2d39 commit 12ecdcf

8 files changed

Lines changed: 85 additions & 103 deletions

File tree

examples/get_cert_service_tpp.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ def main():
5252
request.ip_addresses = ["127.0.0.1", "192.168.1.1"]
5353
request.uniform_resource_identifiers = ["http://wgtest.uri.com","https://ragnartest.uri.com"]
5454
request.user_principal_names = ["upn1@venafi.example.com", "upn2@venafi.example.com"]
55+
# Specify whether or not to return the private key. It is False by default.
56+
# A password should be defined for the private key if include_private_key is True.
57+
request.include_private_key = True
58+
request.key_password = 'Foo.Bar.Pass.123!'
5559
# Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first")
5660
# or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST.
5761
# You can also specify CHAIN_OPTION_IGNORE ("ignore") to ignore chain (supported only for TPP).

tests/test_e2e.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ def test_tpp_token_enroll(self):
290290
def test_tpp_token_enroll_with_service_generated_csr(self):
291291
cn = random_word(10) + ".venafi.example.com"
292292
try:
293-
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, service_generated_csr=True)
293+
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="FooBarPass123",
294+
service_generated_csr=True)
294295
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
295296
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
296297
except Exception as err:
@@ -527,18 +528,21 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
527528
request.csr = csr
528529
elif service_generated_csr:
529530
request.csr_origin = CSR_ORIGIN_SERVICE
531+
request.include_private_key = True
530532

531533
conn.request_cert(request, zone)
532534
cert = conn.retrieve_cert(request)
533535
# print("Certificate is:\n %s" % cert_pem)
534536
# print("Private key is:\n %s:" % request.private_key_pem)
535537
# and save into file
536-
f = open("./cert.pem", "w")
537-
f.write(cert.full_chain)
538-
if not service_generated_csr:
539-
f = open("./cert.key", "w")
540-
f.write(request.private_key_pem)
541-
f.close()
538+
with open("./cert.pem", "w") as f:
539+
f.write(cert.full_chain)
540+
with open("./cert.key", "w") as f2:
541+
if request.include_private_key:
542+
assert cert.key is not None
543+
f2.write(cert.key)
544+
else:
545+
f2.write(request.private_key_pem)
542546

543547
cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
544548
assert isinstance(cert, x509.Certificate)

vcert/common.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
from .policy import PolicySpecification
3939
from .ssh_utils import SSHCertRequest, SSHRetrieveResponse
4040

41-
4241
MIME_JSON = "application/json"
4342
MIME_HTML = "text/html"
4443
MIME_TEXT = "text/plain"
@@ -264,7 +263,8 @@ def __init__(self, cert_id=None,
264263
origin=None,
265264
custom_fields=None,
266265
timeout=DEFAULT_TIMEOUT,
267-
csr_origin=CSR_ORIGIN_LOCAL
266+
csr_origin=CSR_ORIGIN_LOCAL,
267+
include_private_key=False
268268
):
269269
"""
270270
:param str cert_id: Certificate request id. Generating by server.
@@ -285,6 +285,7 @@ def __init__(self, cert_id=None,
285285
:param list[CustomField] custom_fields: list of custom fields values to be added to the certificate.
286286
:param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds.
287287
:param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated.
288+
:param bool include_private_key: Indicates if the private key should be returned by the server or not.
288289
"""
289290

290291
self.chain_option = CHAIN_OPTION_LAST # "last"
@@ -318,6 +319,7 @@ def __init__(self, cert_id=None,
318319
self.custom_fields = custom_fields
319320
self.cert_guid = None
320321
self.timeout = timeout
322+
self.include_private_key = include_private_key
321323

322324
def __setattr__(self, key, value):
323325
if key == "key_password":

vcert/connection_cloud.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,11 @@ def retrieve_cert(self, request):
356356
status = 0
357357
if status == HTTPStatus.OK:
358358
log.debug("Certificate found, parsing response...")
359-
return parse_pem(data, request.chain_option)
359+
cert_response = parse_pem(data, request.chain_option)
360+
if cert_response.key is None and request.private_key is not None:
361+
log.debug("Adding local private key to response...")
362+
cert_response.key = request.private_key_pem
363+
return cert_response
360364
elif (time.time() - time_start) < request.timeout:
361365
log.debug("Waiting for certificate...")
362366
time.sleep(2)

vcert/connection_fake.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ def retrieve_cert(self, certificate_request):
179179
).sign(root_ca_private_key, hashes.SHA256(), default_backend())
180180

181181
log.info("This certificate is for test purposes only. Don't use it in production!")
182-
return parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option)
182+
response = parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option)
183+
response.key = certificate_request.private_key_pem
184+
return response
183185

184186
def revoke_cert(self, request):
185187
raise NotImplementedError

vcert/connection_tpp.py

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,16 @@
1717
from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes,
1818
with_statement)
1919

20-
import base64
2120
import logging as log
2221
import re
2322
import time
2423

2524
import requests
26-
from cryptography import x509
27-
from cryptography.hazmat.backends import default_backend
28-
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
2925

30-
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType, CSR_ORIGIN_LOCAL, CSR_ORIGIN_SERVICE, \
31-
CSR_ORIGIN_PROVIDED
26+
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType
3227
from .connection_tpp_abstract import AbstractTPPConnection, URLS
33-
from .errors import (ServerUnexptedBehavior, ClientBadData, CertificateRequestError, AuthenticationError,
34-
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
28+
from .errors import (ServerUnexptedBehavior, ClientBadData, AuthenticationError)
3529
from .http import HTTPStatus
36-
from .pem import parse_pem
3730

3831
TOKEN_HEADER_NAME = "x-venafi-api-key" # nosec
3932

@@ -115,43 +108,6 @@ def auth(self):
115108
log.error("Authentication status is not %s but %s. Exiting" % (HTTPStatus.OK, status[0]))
116109
raise AuthenticationError
117110

118-
def retrieve_cert(self, cert_request):
119-
log.debug("Getting certificate status for id %s" % cert_request.id)
120-
121-
retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true')
122-
123-
if cert_request.chain_option == "last":
124-
retrieve_request['RootFirstOrder'] = 'false'
125-
retrieve_request['IncludeChain'] = 'true'
126-
elif cert_request.chain_option == "first":
127-
retrieve_request['RootFirstOrder'] = 'true'
128-
retrieve_request['IncludeChain'] = 'true'
129-
elif cert_request.chain_option == "ignore":
130-
retrieve_request['IncludeChain'] = 'false'
131-
else:
132-
log.error("chain option %s is not valid" % cert_request.chain_option)
133-
raise ClientBadData
134-
135-
time_start = time.time()
136-
while True:
137-
try:
138-
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
139-
except VenafiError:
140-
log.debug("Certificate with id %s not found." % cert_request.id)
141-
status = 0
142-
143-
if status == HTTPStatus.OK:
144-
pem64 = data['CertificateData']
145-
pem = base64.b64decode(pem64)
146-
return parse_pem(pem.decode(), cert_request.chain_option)
147-
elif (time.time() - time_start) < cert_request.timeout:
148-
log.debug("Waiting for certificate...")
149-
time.sleep(2)
150-
else:
151-
raise RetrieveCertificateTimeoutError(
152-
'Operation timed out at %d seconds while retrieving certificate with id %s'
153-
% (cert_request.timeout, cert_request.id))
154-
155111
def revoke_cert(self, request):
156112
if not (request.id or request.thumbprint):
157113
raise ClientBadData

vcert/connection_tpp_abstract.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16+
import base64
1617
import logging as log
1718
import re
1819
import time
20+
from pprint import pprint
21+
1922
from cryptography import x509
2023
from cryptography.hazmat.backends import default_backend
2124
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
22-
from pprint import pprint
2325

26+
from vcert.pem import parse_pem
2427
from vcert.common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
25-
CSR_ORIGIN_SERVICE, KeyType
28+
CSR_ORIGIN_SERVICE, KeyType, CHAIN_OPTION_LAST, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE
2629
from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError, \
2730
CertificateRequestError, CertificateRenewError
2831
from vcert.http import HTTPStatus
@@ -126,6 +129,56 @@ def request_cert(self, request, zone):
126129
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
127130
raise CertificateRequestError
128131

132+
def retrieve_cert(self, cert_request):
133+
log.debug("Getting certificate status for id %s" % cert_request.id)
134+
135+
retrieve_request = dict(CertificateDN=cert_request.id,
136+
Format="base64",
137+
IncludeChain=True)
138+
139+
if cert_request.csr_origin == CSR_ORIGIN_SERVICE:
140+
retrieve_request['IncludePrivateKey'] = cert_request.include_private_key
141+
if cert_request.key_password:
142+
# The password is encoded when assigned (for local use, I suppose).
143+
# decode is needed to send a raw string
144+
retrieve_request['Password'] = cert_request.key_password.decode()
145+
146+
if cert_request.chain_option == CHAIN_OPTION_LAST:
147+
retrieve_request['RootFirstOrder'] = 'false'
148+
retrieve_request['IncludeChain'] = 'true'
149+
elif cert_request.chain_option == CHAIN_OPTION_FIRST:
150+
retrieve_request['RootFirstOrder'] = 'true'
151+
retrieve_request['IncludeChain'] = 'true'
152+
elif cert_request.chain_option == CHAIN_OPTION_IGNORE:
153+
retrieve_request['IncludeChain'] = 'false'
154+
else:
155+
log.error("chain option %s is not valid" % cert_request.chain_option)
156+
raise ClientBadData
157+
158+
time_start = time.time()
159+
while True:
160+
try:
161+
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
162+
except VenafiError:
163+
log.debug("Certificate with id %s not found." % cert_request.id)
164+
status = 0
165+
166+
if status == HTTPStatus.OK:
167+
pem64 = data['CertificateData']
168+
pem = base64.b64decode(pem64)
169+
cert_response = parse_pem(pem.decode(), cert_request.chain_option)
170+
if cert_response.key is None and cert_request.private_key is not None:
171+
log.debug("Adding private key to response...")
172+
cert_response.key = cert_request.private_key_pem
173+
return cert_response
174+
elif (time.time() - time_start) < cert_request.timeout:
175+
log.debug("Waiting for certificate...")
176+
time.sleep(2)
177+
else:
178+
raise RetrieveCertificateTimeoutError(
179+
'Operation timed out at %d seconds while retrieving certificate with id %s'
180+
% (cert_request.timeout, cert_request.id))
181+
129182
def renew_cert(self, request, reuse_key=False):
130183
if not request.id and not request.thumbprint:
131184
log.debug("Request id or thumbprint must be specified for TPP")

vcert/connection_tpp_token.py

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,16 @@
1818
from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes,
1919
with_statement)
2020

21-
import base64
2221
import logging as log
2322
import re
2423
import time
2524

2625
import requests
27-
from cryptography import x509
28-
from cryptography.hazmat.backends import default_backend
29-
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
3026

3127
from .common import MIME_JSON, TokenInfo, Authentication, KeyType, Policy, ZoneConfig, CertField
3228
from .connection_tpp_abstract import AbstractTPPConnection, URLS
33-
from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError, CertificateRequestError,
34-
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
29+
from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError)
3530
from .http import HTTPStatus
36-
from .pem import parse_pem
3731

3832
HEADER_AUTHORIZATION = "Authorization" # type: str
3933

@@ -129,43 +123,6 @@ def auth(self):
129123
def import_cert(self, request):
130124
raise NotImplementedError
131125

132-
def retrieve_cert(self, cert_request):
133-
log.debug("Getting certificate status for id %s" % cert_request.id)
134-
135-
retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true')
136-
137-
if cert_request.chain_option == "last":
138-
retrieve_request['RootFirstOrder'] = 'false'
139-
retrieve_request['IncludeChain'] = 'true'
140-
elif cert_request.chain_option == "first":
141-
retrieve_request['RootFirstOrder'] = 'true'
142-
retrieve_request['IncludeChain'] = 'true'
143-
elif cert_request.chain_option == "ignore":
144-
retrieve_request['IncludeChain'] = 'false'
145-
else:
146-
log.error("chain option %s is not valid" % cert_request.chain_option)
147-
raise ClientBadData
148-
149-
time_start = time.time()
150-
while True:
151-
try:
152-
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
153-
except VenafiError:
154-
log.debug("Certificate with id %s not found." % cert_request.id)
155-
status = 0
156-
157-
if status == HTTPStatus.OK:
158-
pem64 = data['CertificateData']
159-
pem = base64.b64decode(pem64)
160-
return parse_pem(pem.decode(), cert_request.chain_option)
161-
elif (time.time() - time_start) < cert_request.timeout:
162-
log.debug("Waiting for certificate...")
163-
time.sleep(2)
164-
else:
165-
raise RetrieveCertificateTimeoutError(
166-
'Operation timed out at %d seconds while retrieving certificate with id %s'
167-
% (cert_request.timeout, cert_request.id))
168-
169126
def revoke_cert(self, request):
170127
if not (request.id or request.thumbprint):
171128
raise ClientBadData

0 commit comments

Comments
 (0)