-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathup.py
More file actions
221 lines (164 loc) · 5.67 KB
/
up.py
File metadata and controls
221 lines (164 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from pydantic import BaseModel, model_validator, ValidationError, Field
from typing import Union, Literal, Optional, List, Annotated
from functools import reduce
from cf_remote import log
import cfengine_cli.validate as validate
from cfengine_cli.utils import UserError
# Forces pydantic to throw validation error if config contains unknown keys
class NoExtra(BaseModel, extra="forbid"):
pass
class Config(NoExtra):
pass
class AWSConfig(Config):
image: str
size: Literal["micro", "xlarge"] = "micro"
@model_validator(mode="after")
def check_aws_config(self):
validate.validate_aws_image(self.image)
return self
class VagrantConfig(Config):
box: str
memory: int = 512
cpus: int = 1
sync_folder: Optional[str] = None
provision: Optional[str] = None
@model_validator(mode="after")
def check_vagrant_config(self):
if self.memory < 512:
raise UserError("Cannot allocate less than 512MB to a Vagrant VM")
if self.cpus < 1:
raise UserError("Cannot use less than 1 cpu per Vagrant VM")
validate.validate_vagrant_box(self.box)
return self
class GCPConfig(Config):
image: str # There is no list of available GCP platforms to validate against yet
network: Optional[str] = None
public_ip: bool = True
size: str = "n1-standard-1"
class AWSProvider(Config):
provider: Literal["aws"]
aws: AWSConfig
@model_validator(mode="after")
def check_aws_provider(self):
validate.validate_aws_credentials()
return self
class GCPProvider(Config):
provider: Literal["gcp"]
gcp: GCPConfig
@model_validator(mode="after")
def check_gcp_provider(self):
validate.validate_gcp_credentials()
return self
class VagrantProvider(Config):
provider: Literal["vagrant"]
vagrant: VagrantConfig
class SaveMode(Config):
mode: Literal["save"]
hosts: List[str]
class SpawnMode(Config):
mode: Literal["spawn"]
# "Field" forces pydantic to report errors on the branch defined by the field "mode"
spawn: Annotated[
Union[VagrantProvider, AWSProvider, GCPProvider],
Field(discriminator="provider"),
]
count: int
@model_validator(mode="after")
def check_spawn_config(self):
if self.count < 1:
raise UserError("Cannot spawn less than 1 instance")
return self
class CFEngineConfig(Config):
version: Optional[str] = None
bootstrap: Optional[str] = None
edition: Literal["community", "enterprise"] = "enterprise"
remote_download: bool = False
hub_package: Optional[str] = None
client_package: Optional[str] = None
package: Optional[str] = None
demo: bool = False
@model_validator(mode="after")
def check_cfengine_config(self):
packages = [self.package, self.hub_package, self.client_package]
for p in packages:
validate.validate_package(p, self.remote_download)
if self.version and any(packages):
log.warning("Specifying package overrides cfengine version")
validate.validate_version(self.version, self.edition)
validate.validate_state_bootstrap(self.bootstrap)
return self
class GroupConfig(Config):
role: Literal["client", "hub"]
# "Field" forces pydantic to report errors on the branch defined by the field "provider"
source: Annotated[Union[SaveMode, SpawnMode], Field(discriminator="mode")]
cfengine: Optional[CFEngineConfig] = None
scripts: Optional[List[str]] = None
@model_validator(mode="after")
def check_group_config(self):
if (
self.role == "hub"
and self.source.mode == "spawn"
and self.source.count != 1
):
raise UserError("A hub can only have one host")
return self
def rgetattr(obj, attr, *args):
def _getattr(obj, attr):
return getattr(obj, attr, *args)
return reduce(_getattr, [obj] + attr.split("."))
class Group:
"""
All group-specific data:
- Vagrantfile
Config that declares it:
- provider, count, cfengine version, role, ...
"""
def __init__(self, config: GroupConfig):
self.config = config
self.hosts = []
class Host:
"""
All host-specific data:
- user, ip, ssh config, OS, uuid, ...
"""
def __init__(self):
pass
def _resolve_templates(parent, templates):
if not parent:
return
if isinstance(parent, dict):
for key, value in parent.items():
if isinstance(value, str) and value in templates:
parent[key] = templates[value]
else:
_resolve_templates(value, templates)
if isinstance(parent, list):
for value in parent:
_resolve_templates(value, templates)
def validate_config(content):
if not content:
raise UserError("Empty spawn config")
if "groups" not in content:
raise UserError("Missing 'groups' key in spawn config")
groups = content["groups"]
templates = content.get("templates")
if templates:
_resolve_templates(groups, templates)
if not isinstance(groups, list):
groups = [groups]
state = {}
try:
for g in groups:
if len(g) != 1:
raise UserError(
f"Too many keys in group definition: {", ".join(list(g.keys()))}"
)
for k, v in g.items():
state[k] = Group(GroupConfig(**v))
except ValidationError as v:
msgs = []
for err in v.errors():
msgs.append(
f"{err["msg"]}. Input '{err["input"]}' at location '{err["loc"]}'"
)
raise UserError("\n".join(msgs))