Skip to content

Commit 1d79f92

Browse files
committed
- Added test cases for SSH CA public key retrieval
- Added abstract methods for HTTP operations POST and GET on TPP implementations.
1 parent a420283 commit 1d79f92

5 files changed

Lines changed: 132 additions & 26 deletions

File tree

examples/ssh_certificates/retrieve_ca_public_key.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,9 @@
1515
# limitations under the License.
1616
#
1717
import logging
18-
import random
19-
import string
2018
from os import environ
2119

22-
from vcert import venafi_connection, Authentication, SCOPE_SSH, SSHKeyPair, SSHCertRequest, write_ssh_files, \
23-
VenafiPlatform, SSHCATemplateRequest
20+
from vcert import venafi_connection, Authentication, SCOPE_SSH, VenafiPlatform, SSHCATemplateRequest
2421

2522
logging.basicConfig(level=logging.INFO)
2623
logging.getLogger("urllib3").setLevel(logging.ERROR)
@@ -37,7 +34,8 @@ def main():
3734

3835
# A Connector can be instantiated with no values by using the platform argument.
3936
# url argument is always required for TPP.
40-
connector = venafi_connection(platform=VenafiPlatform.TPP, url=url, http_request_kwargs={"verify": False})
37+
connector = venafi_connection(platform=VenafiPlatform.TPP, url=url,
38+
http_request_kwargs={"verify": "/tmp/chain.pem"})
4139
# Optionally, the connector can be instantiated passing the specific arguments:
4240
# connector = venafi_connection(url=url, user=user, password=password, http_request_kwargs={"verify": False})
4341

@@ -72,10 +70,8 @@ def main():
7270
ssh_config = connector.retrieve_ssh_config(ca_request=request)
7371
with open("./ca2-pub.key", 'w') as ca_file:
7472
ca_file.write(pub_key_data)
75-
76-
print("CA principals:\n")
77-
print(ssh_config.ca_principals)
73+
print("Certificate Authority principals: %s" % ssh_config.ca_principals)
7874

7975

8076
if __name__ == '__main__':
81-
main()
77+
main()

tests/test_ssh.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
from test_env import TPP_TOKEN_URL, TPP_USER, TPP_PASSWORD, TPP_SSH_CADN
2424
from test_utils import timestamp
2525
from vcert import CommonConnection, SSHCertRequest, TPPTokenConnection, Authentication, \
26-
SCOPE_SSH, write_ssh_files, logger
27-
from vcert.ssh_utils import SSHRetrieveResponse, SSHKeyPair
26+
SCOPE_SSH, write_ssh_files, logger, venafi_connection, VenafiPlatform
27+
from vcert.ssh_utils import SSHRetrieveResponse, SSHKeyPair, SSHCATemplateRequest
2828

2929
log = logger.get_child("test-ssh")
3030

@@ -34,7 +34,7 @@
3434

3535
class TestTPPSSHCertificate(unittest.TestCase):
3636
def __init__(self, *args, **kwargs):
37-
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, http_request_kwargs={"verify": False})
37+
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, http_request_kwargs={"verify": "/tmp/chain.pem"})
3838
auth = Authentication(user=TPP_USER, password=TPP_PASSWORD, scope=SCOPE_SSH)
3939
self.tpp_conn.get_access_token(auth)
4040
super(TestTPPSSHCertificate, self).__init__(*args, **kwargs)
@@ -65,9 +65,25 @@ def test_enroll_service_generated_keypair(self):
6565
self.assertTrue(response.public_key_data, SERVICE_GENERATED_NO_KEY_ERROR % ("Public", "", request.key_id))
6666
self.assertTrue(response.certificate_data, SSH_CERT_DATA_ERROR % request.key_id)
6767

68+
def test_retrieve_ca_public_key(self):
69+
tpp_connector = venafi_connection(platform=VenafiPlatform.TPP, url=TPP_TOKEN_URL,
70+
http_request_kwargs={"verify": "/tmp/chain.pem"})
71+
request = SSHCATemplateRequest(ca_template=TPP_SSH_CADN)
72+
ssh_config = tpp_connector.retrieve_ssh_config(ca_request=request)
73+
self.assertIsNotNone(ssh_config.ca_public_key, "%s Public Key data is empty" % TPP_SSH_CADN)
74+
self.assertIsNone(ssh_config.ca_principals, "%s default principals is not empty" % TPP_SSH_CADN)
75+
log.debug("%s Public Key data:\n%s" % (TPP_SSH_CADN, ssh_config.ca_public_key))
76+
77+
def test_retrieve_ca_public_key_and_principals(self):
78+
request = SSHCATemplateRequest(ca_template=TPP_SSH_CADN)
79+
ssh_config = self.tpp_conn.retrieve_ssh_config(ca_request=request)
80+
self.assertIsNotNone(ssh_config.ca_public_key, "%s Public Key data is empty" % TPP_SSH_CADN)
81+
self.assertIsNotNone(ssh_config.ca_principals, "%s default principals is empty" % TPP_SSH_CADN)
82+
log.debug("%s Public Key data: %s" % (TPP_SSH_CADN, ssh_config.ca_public_key))
83+
log.debug("%s default principals: %s" % (TPP_SSH_CADN, ssh_config.ca_principals))
6884

69-
class TestSSHUtils(unittest.TestCase):
7085

86+
class TestSSHUtils(unittest.TestCase):
7187
def test_write_ssh_files(self):
7288
key_id = _random_key_id()
7389
normalized_name = re.sub(r"[^A-Za-z0-9]+", "_", key_id)

vcert/connection_tpp.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,28 @@ def __setattr__(self, key, value):
5757
def __str__(self):
5858
return "[TPP] %s" % self._base_url
5959

60+
def get(self, args):
61+
"""
62+
63+
:param dict args:
64+
:rtype: tuple[Any, Any]
65+
"""
66+
url = args[self.ARG_URL] if self.ARG_URL in args else None
67+
params = args[self.ARG_PARAMS] if self.ARG_PARAMS in args else None
68+
69+
return self._get(url=url, params=params)
70+
71+
def post(self, args):
72+
"""
73+
74+
:param dict args:
75+
:rtype: tuple[Any, Any]
76+
"""
77+
url = args[self.ARG_URL] if self.ARG_URL in args else None
78+
data = args[self.ARG_DATA] if self.ARG_DATA in args else None
79+
80+
return self._post(url=url, data=data)
81+
6082
def _get(self, url="", params=None):
6183
if not self._token or self._token[1] < time.time() + 1:
6284
self.auth()

vcert/connection_tpp_abstract.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -598,18 +598,27 @@ def retrieve_ssh_config(self, ca_request):
598598
value = None
599599
if ca_request.template:
600600
key = 'DN'
601-
value = "%s%s%s" % (CA_ROOT_PATH, PATH_SEPARATOR, ca_request.guid)
601+
value = ca_request.template
602+
if not value.startswith(PATH_SEPARATOR):
603+
value = "%s%s" % (PATH_SEPARATOR, value)
604+
if not value.startswith(CA_ROOT_PATH):
605+
value = "%s%s" % (CA_ROOT_PATH, value)
602606
elif ca_request.guid:
603607
key = 'guid'
604608
value = ca_request.guid
605609
else:
606610
raise ClientBadData("CA Guid or CA template must be provided to retrieve SSH config.")
607611

612+
value = url_parse.quote(value)
608613
query = "%s=%s" % (key, value)
609-
query = url_parse.quote(query)
610614
url = "%s?%s" % (URLS.SSH_CA_PUBLIC_KEY, query)
611615

612-
status, data = self._get(url=url, params=None)
616+
args = {
617+
self.ARG_URL: url,
618+
self.ARG_CHECK_TOKEN: False,
619+
self.ARG_INCLUDE_TOKEN_HEADER: False
620+
}
621+
status, data = self.get(args=args)
613622
if status == HTTPStatus.OK:
614623
ssh_config_response = SSHConfig()
615624
ssh_config_response.ca_public_key = data
@@ -621,6 +630,28 @@ def retrieve_ssh_config(self, ca_request):
621630
raise ServerUnexptedBehavior("Server returns %d status on requesting SSH CA Public Key Data for %s = %s."
622631
% (status, key, value))
623632

633+
ARG_URL = 'url'
634+
ARG_PARAMS = 'params'
635+
ARG_CHECK_TOKEN = 'check_token'
636+
ARG_INCLUDE_TOKEN_HEADER = 'include_token_header'
637+
ARG_DATA = 'data'
638+
639+
def get(self, args):
640+
"""
641+
642+
:param dict args:
643+
:rtype: tuple[Any, Any]
644+
"""
645+
raise NotImplementedError
646+
647+
def post(self, args):
648+
"""
649+
650+
:param dict args:
651+
:rtype: tuple[Any, Any]
652+
"""
653+
raise NotImplementedError
654+
624655
# ======================================== API IMPLEMENTATION ENDS ======================================== #
625656
# ========================================================================================================= #
626657

@@ -876,13 +907,22 @@ def _retrieve_ssh_ca_details(self, ca_request):
876907
"""
877908
json_request = dict()
878909
if ca_request.template:
879-
json_request['DN'] = "%s%s%s" % (CA_ROOT_PATH, PATH_SEPARATOR, ca_request.guid)
910+
value = ca_request.template
911+
if not value.startswith(PATH_SEPARATOR):
912+
value = "%s%s" % (PATH_SEPARATOR, value)
913+
if not value.startswith(CA_ROOT_PATH):
914+
value = "%s%s" % (CA_ROOT_PATH, value)
915+
json_request['DN'] = value
880916
elif ca_request.guid:
881917
json_request['Guid'] = ca_request.guid
882918
else:
883919
raise ClientBadData("CA Guid or CA template must be provided to retrieve SSH CA details.")
884920

885-
status, data = self._post(URLS.SSH_CA_DETAILS, json_request)
921+
args = {
922+
self.ARG_URL: URLS.SSH_CA_DETAILS,
923+
self.ARG_DATA: json_request
924+
}
925+
status, data = self.post(args=args)
886926
if status == HTTPStatus.OK:
887927
response_object = SSHResponse(data['Response'])
888928
if response_object.success:

vcert/connection_tpp_token.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,26 +63,58 @@ def __setattr__(self, key, value):
6363
def __str__(self):
6464
return "[TPP] %s" % self._base_url
6565

66-
def _get(self, url=None, params=None, check_token=True, include_headers=True):
66+
def get(self, args):
67+
"""
68+
69+
:param dict args:
70+
:rtype: tuple[Any, Any]
71+
"""
72+
url = args[self.ARG_URL] if self.ARG_URL in args else None
73+
params = args[self.ARG_PARAMS] if self.ARG_PARAMS in args else None
74+
check_token = args[self.ARG_CHECK_TOKEN] if self.ARG_CHECK_TOKEN in args else True
75+
include_token_header = args[self.ARG_INCLUDE_TOKEN_HEADER] if self.ARG_INCLUDE_TOKEN_HEADER in args else True
76+
77+
return self._get(url=url, params=params, check_token=check_token, include_token_header=include_token_header)
78+
79+
def post(self, args):
80+
"""
81+
82+
:param dict args:
83+
:rtype: tuple[Any, Any]
84+
"""
85+
url = args[self.ARG_URL] if self.ARG_URL in args else None
86+
data = args[self.ARG_DATA] if self.ARG_DATA in args else None
87+
check_token = args[self.ARG_CHECK_TOKEN] if self.ARG_CHECK_TOKEN in args else True
88+
include_token_header = args[self.ARG_INCLUDE_TOKEN_HEADER] if self.ARG_INCLUDE_TOKEN_HEADER in args else True
89+
90+
return self._post(url=url, data=data, check_token=check_token, include_token_header=include_token_header)
91+
92+
def _get(self, url=None, params=None, check_token=True, include_token_header=True):
6793
if check_token:
6894
self._check_token()
6995

70-
headers = {}
71-
if include_headers:
96+
headers = {
97+
'content-type': MIME_JSON,
98+
'cache-control': "no-cache"
99+
}
100+
if include_token_header:
72101
token = self._get_auth_header_value(self._auth.access_token)
73-
headers = {HEADER_AUTHORIZATION: token, 'content-type': MIME_JSON, 'cache-control': 'no-cache'}
102+
headers[HEADER_AUTHORIZATION] = token
74103

75104
r = requests.get(self._base_url + url, headers=headers, params=params, **self._http_request_kwargs)
76105
return self.process_server_response(r)
77106

78-
def _post(self, url=None, data=None, check_token=True, include_headers=True):
107+
def _post(self, url=None, data=None, check_token=True, include_token_header=True):
79108
if check_token:
80109
self._check_token()
81110

82-
headers = {}
83-
if include_headers:
111+
headers = {
112+
'content-type': MIME_JSON,
113+
'cache-control': "no-cache"
114+
}
115+
if include_token_header:
84116
token = self._get_auth_header_value(self._auth.access_token)
85-
headers = {HEADER_AUTHORIZATION: token, 'content-type': MIME_JSON, "cache-control": "no-cache"}
117+
headers[HEADER_AUTHORIZATION] = token
86118

87119
if isinstance(data, dict):
88120
log.debug("POST Request\n\tURL: %s\n\tHeaders:%s\n\tBody:%s\n" % (self._base_url+url, headers, data))

0 commit comments

Comments
 (0)