Skip to content

Commit 6dfd901

Browse files
authored
Merge pull request #79 from Venafi/ssh_ca_config
Retrieve SSH CA config
2 parents fe61b78 + 1d79f92 commit 6dfd901

8 files changed

Lines changed: 446 additions & 88 deletions

File tree

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright 2021 Venafi, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import logging
18+
from os import environ
19+
20+
from vcert import venafi_connection, Authentication, SCOPE_SSH, VenafiPlatform, SSHCATemplateRequest
21+
22+
logging.basicConfig(level=logging.INFO)
23+
logging.getLogger("urllib3").setLevel(logging.ERROR)
24+
25+
26+
def main():
27+
# Get credentials from environment variables.
28+
url = environ.get('TPP_URL')
29+
ca_dn = environ.get('TPP_SSH_CADN')
30+
ca_guid = environ.get('TPP_SSH_CA_GUID')
31+
# Authentication is required for retrieving the CA principals only.
32+
user = environ.get("TPP_USER")
33+
password = environ.get("TPP_PASSWORD")
34+
35+
# A Connector can be instantiated with no values by using the platform argument.
36+
# url argument is always required for TPP.
37+
connector = venafi_connection(platform=VenafiPlatform.TPP, url=url,
38+
http_request_kwargs={"verify": "/tmp/chain.pem"})
39+
# Optionally, the connector can be instantiated passing the specific arguments:
40+
# connector = venafi_connection(url=url, user=user, password=password, http_request_kwargs={"verify": False})
41+
42+
# If your TPP server certificate is signed with your own CA, or available only via proxy,
43+
# you can specify a trust bundle using requests vars:
44+
# connector = venafi_connection(url=url, api_key=api_key, access_token=access_token,
45+
# http_request_kwargs={"verify": "/path-to/bundle.pem"})
46+
47+
# Create an SSHCATemplateRequest to pass the identifier of the SSH Certificate Authority to retrieve.
48+
# Either CADN or Guid can be used as identifiers.
49+
request = SSHCATemplateRequest(ca_template=ca_dn)
50+
# request = SSHCATemplateRequest(ca_guid=ca_guid)
51+
52+
# Retrieve the public key.
53+
# No Authentication is provided to the Connector so, only the public key is available.
54+
ssh_config = connector.retrieve_ssh_config(ca_request=request)
55+
pub_key_data = ssh_config.ca_public_key
56+
with open("./ca-pub.key", 'w') as ca_file:
57+
ca_file.write(pub_key_data)
58+
59+
# To retrieve the CA principals create an Authentication object with the proper scope to manage SSH certificates.
60+
auth = Authentication(user=user, password=password, scope=SCOPE_SSH)
61+
# Additionally, you may change the default client id for a custom one.
62+
# Make sure this id has been registered on the TPP instance beforehand.
63+
# Also, the user (TTP_USER) should be allowed to use this application
64+
# and the application should have the ssh permissions enabled.
65+
auth.client_id = 'vcert-ssh-ca-pubkey-demo'
66+
# Request an access token.
67+
# After the request is successful, subsequent api calls will use the same token
68+
connector.get_access_token(auth)
69+
# Retrieve SSH Certificate Authority public key and principals
70+
ssh_config = connector.retrieve_ssh_config(ca_request=request)
71+
with open("./ca2-pub.key", 'w') as ca_file:
72+
ca_file.write(pub_key_data)
73+
print("Certificate Authority principals: %s" % ssh_config.ca_principals)
74+
75+
76+
if __name__ == '__main__':
77+
main()

tests/test_ssh.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
from test_env import TPP_TOKEN_URL, TPP_USER, TPP_PASSWORD, TPP_SSH_CADN
2424
from test_utils import timestamp
2525
from vcert import CommonConnection, SSHCertRequest, TPPTokenConnection, Authentication, \
26-
SCOPE_SSH, write_ssh_files, logger
27-
from vcert.ssh_utils import SSHRetrieveResponse, SSHKeyPair
26+
SCOPE_SSH, write_ssh_files, logger, venafi_connection, VenafiPlatform
27+
from vcert.ssh_utils import SSHRetrieveResponse, SSHKeyPair, SSHCATemplateRequest
2828

2929
log = logger.get_child("test-ssh")
3030

@@ -34,7 +34,7 @@
3434

3535
class TestTPPSSHCertificate(unittest.TestCase):
3636
def __init__(self, *args, **kwargs):
37-
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, http_request_kwargs={"verify": False})
37+
self.tpp_conn = TPPTokenConnection(url=TPP_TOKEN_URL, http_request_kwargs={"verify": "/tmp/chain.pem"})
3838
auth = Authentication(user=TPP_USER, password=TPP_PASSWORD, scope=SCOPE_SSH)
3939
self.tpp_conn.get_access_token(auth)
4040
super(TestTPPSSHCertificate, self).__init__(*args, **kwargs)
@@ -65,9 +65,25 @@ def test_enroll_service_generated_keypair(self):
6565
self.assertTrue(response.public_key_data, SERVICE_GENERATED_NO_KEY_ERROR % ("Public", "", request.key_id))
6666
self.assertTrue(response.certificate_data, SSH_CERT_DATA_ERROR % request.key_id)
6767

68+
def test_retrieve_ca_public_key(self):
69+
tpp_connector = venafi_connection(platform=VenafiPlatform.TPP, url=TPP_TOKEN_URL,
70+
http_request_kwargs={"verify": "/tmp/chain.pem"})
71+
request = SSHCATemplateRequest(ca_template=TPP_SSH_CADN)
72+
ssh_config = tpp_connector.retrieve_ssh_config(ca_request=request)
73+
self.assertIsNotNone(ssh_config.ca_public_key, "%s Public Key data is empty" % TPP_SSH_CADN)
74+
self.assertIsNone(ssh_config.ca_principals, "%s default principals is not empty" % TPP_SSH_CADN)
75+
log.debug("%s Public Key data:\n%s" % (TPP_SSH_CADN, ssh_config.ca_public_key))
76+
77+
def test_retrieve_ca_public_key_and_principals(self):
78+
request = SSHCATemplateRequest(ca_template=TPP_SSH_CADN)
79+
ssh_config = self.tpp_conn.retrieve_ssh_config(ca_request=request)
80+
self.assertIsNotNone(ssh_config.ca_public_key, "%s Public Key data is empty" % TPP_SSH_CADN)
81+
self.assertIsNotNone(ssh_config.ca_principals, "%s default principals is empty" % TPP_SSH_CADN)
82+
log.debug("%s Public Key data: %s" % (TPP_SSH_CADN, ssh_config.ca_public_key))
83+
log.debug("%s default principals: %s" % (TPP_SSH_CADN, ssh_config.ca_principals))
6884

69-
class TestSSHUtils(unittest.TestCase):
7085

86+
class TestSSHUtils(unittest.TestCase):
7187
def test_write_ssh_files(self):
7288
key_id = _random_key_id()
7389
normalized_name = re.sub(r"[^A-Za-z0-9]+", "_", key_id)

vcert/__init__.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
17-
from .logger import setup_logger, get_logger, get_child
1816
from .common import CertificateRequest, CommonConnection, RevocationRequest, ZoneConfig, CertField, KeyType, \
1917
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
20-
CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, CHAIN_OPTION_LAST
18+
CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, CHAIN_OPTION_LAST, VenafiPlatform
2119
from .connection_cloud import CloudConnection
2220
from .connection_tpp import TPPConnection
2321
from .connection_tpp_token import TPPTokenConnection
2422
from .connection_fake import FakeConnection
23+
from .errors import VenafiError
24+
from .logger import setup_logger, get_logger, get_child
2525
from .pem import Certificate
26-
from .ssh_utils import SSHCertRequest, SSHKeyPair, write_ssh_files
26+
from .ssh_utils import SSHCertRequest, SSHKeyPair, write_ssh_files, SSHCATemplateRequest, SSHConfig
2727
from .tpp_utils import IssuerHint
2828

2929
setup_logger()
@@ -54,7 +54,7 @@ def Connection(url=None, token=None, user=None, password=None, fake=False, http_
5454

5555

5656
def venafi_connection(url=None, api_key=None, user=None, password=None, access_token=None, refresh_token=None,
57-
fake=False, http_request_kwargs=None):
57+
fake=False, http_request_kwargs=None, platform=None):
5858
"""
5959
Return connection based on credentials list.
6060
Venafi Platform (TPP) requires URL and access_token (or user and password for getting a new access_token)
@@ -68,14 +68,26 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t
6868
:param str refresh_token: TPP refresh token (optional)
6969
:param bool fake: Use fake connection
7070
:param dict[str, Any] http_request_kwargs: Option for specifying trust bundle or to operate insecurely.
71+
:param VenafiPlatform platform: The platform to be used with the Connector
7172
:rtype CommonConnection:
7273
"""
73-
if fake:
74-
return FakeConnection()
75-
if url and (access_token or refresh_token or (user and password)):
76-
return TPPTokenConnection(url=url, user=user, password=password, access_token=access_token,
77-
refresh_token=refresh_token, http_request_kwargs=http_request_kwargs)
78-
if api_key:
79-
return CloudConnection(token=api_key, url=url, http_request_kwargs=http_request_kwargs)
74+
if platform:
75+
if platform == VenafiPlatform.FAKE:
76+
return FakeConnection()
77+
elif platform == VenafiPlatform.TPP:
78+
return TPPTokenConnection(url=url, user=user, password=password, access_token=access_token,
79+
refresh_token=refresh_token, http_request_kwargs=http_request_kwargs)
80+
elif platform == VenafiPlatform.VAAS:
81+
return CloudConnection(token=api_key, url=url, http_request_kwargs=http_request_kwargs)
82+
else:
83+
raise VenafiError("Invalid Platform: %s. Cannot instantiate a Connector." % platform)
8084
else:
81-
raise Exception("Bad credentials list")
85+
if fake:
86+
return FakeConnection()
87+
if url and (access_token or refresh_token or (user and password)):
88+
return TPPTokenConnection(url=url, user=user, password=password, access_token=access_token,
89+
refresh_token=refresh_token, http_request_kwargs=http_request_kwargs)
90+
if api_key:
91+
return CloudConnection(token=api_key, url=url, http_request_kwargs=http_request_kwargs)
92+
else:
93+
raise VenafiError("Bad credentials list")

vcert/common.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import socket
2323
import sys
2424

25+
import enum
2526
import ipaddress
2627
from builtins import bytes
2728
from cryptography import x509
@@ -36,7 +37,7 @@
3637
from .errors import VenafiConnectionError, ServerUnexptedBehavior, BadData, ClientBadData
3738
from .http import HTTPStatus
3839
from .policy import PolicySpecification
39-
from .ssh_utils import SSHCertRequest, SSHRetrieveResponse
40+
from .ssh_utils import SSHCertRequest, SSHRetrieveResponse, SSHCATemplateRequest, SSHConfig
4041
from .tpp_utils import IssuerHint
4142

4243
MIME_JSON = "application/json"
@@ -691,6 +692,14 @@ def retrieve_ssh_cert(self, request):
691692
"""
692693
raise NotImplementedError
693694

695+
def retrieve_ssh_config(self, ca_request):
696+
"""
697+
698+
:param SSHCATemplateRequest ca_request:
699+
:rtype: SSHConfig
700+
"""
701+
raise NotImplementedError
702+
694703
@staticmethod
695704
def process_server_response(r):
696705
if r.status_code not in (HTTPStatus.OK, HTTPStatus.ACCEPTED, HTTPStatus.CREATED, HTTPStatus.CONFLICT):
@@ -721,3 +730,16 @@ def process_server_response(r):
721730
else:
722731
log.error("Unexpected content type: %s for request %s" % (content_type, r.request.url))
723732
raise ServerUnexptedBehavior
733+
734+
735+
class VenafiPlatform(enum.IntEnum):
736+
def __new__(cls, value, description):
737+
obj = int.__new__(cls, value)
738+
obj._value_ = value
739+
obj.description = description
740+
741+
return obj
742+
743+
FAKE = 100, "Connector for testing purposes"
744+
TPP = 200, "Trust Protection Platfom"
745+
VAAS = 400, "Venafi as a Service"

vcert/connection_tpp.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,28 @@ def __setattr__(self, key, value):
5757
def __str__(self):
5858
return "[TPP] %s" % self._base_url
5959

60+
def get(self, args):
61+
"""
62+
63+
:param dict args:
64+
:rtype: tuple[Any, Any]
65+
"""
66+
url = args[self.ARG_URL] if self.ARG_URL in args else None
67+
params = args[self.ARG_PARAMS] if self.ARG_PARAMS in args else None
68+
69+
return self._get(url=url, params=params)
70+
71+
def post(self, args):
72+
"""
73+
74+
:param dict args:
75+
:rtype: tuple[Any, Any]
76+
"""
77+
url = args[self.ARG_URL] if self.ARG_URL in args else None
78+
data = args[self.ARG_DATA] if self.ARG_DATA in args else None
79+
80+
return self._post(url=url, data=data)
81+
6082
def _get(self, url="", params=None):
6183
if not self._token or self._token[1] < time.time() + 1:
6284
self.auth()
@@ -116,3 +138,8 @@ def _read_config_dn(self, dn, attribute_name):
116138
if status != HTTPStatus.OK:
117139
raise ServerUnexptedBehavior("")
118140
return data
141+
142+
def _is_valid_auth(self):
143+
if self._token:
144+
return True
145+
return False

0 commit comments

Comments
 (0)