Skip to content

Commit b6a0046

Browse files
authored
Merge pull request #11 from tinybirdco/support-multiline-connection-directives
Support multiline directives in connection file parser
2 parents dbfbc1f + f02f619 commit b6a0046

6 files changed

Lines changed: 140 additions & 5 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "tinybird-sdk"
3-
version = "0.1.3"
3+
version = "0.1.4"
44
description = "Python SDK for Tinybird Forward"
55
readme = "README.md"
66
authors = [

src/tinybird_sdk/generator/connection.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ def _generate_kafka_connection(connection: KafkaConnectionDefinition) -> str:
3434
if options.schema_registry_url:
3535
parts.append(f"KAFKA_SCHEMA_REGISTRY_URL {options.schema_registry_url}")
3636
if options.ssl_ca_pem:
37-
parts.append(f"KAFKA_SSL_CA_PEM {options.ssl_ca_pem}")
37+
if "\n" in options.ssl_ca_pem:
38+
indented = "\n".join(f" {line}" for line in options.ssl_ca_pem.split("\n"))
39+
parts.append(f"KAFKA_SSL_CA_PEM >\n{indented}")
40+
else:
41+
parts.append(f"KAFKA_SSL_CA_PEM {options.ssl_ca_pem}")
3842

3943
return "\n".join(parts)
4044

src/tinybird_sdk/migrate/parse_connection.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,31 @@
11
from __future__ import annotations
22

3-
from .parser_utils import MigrationParseError, is_blank, parse_directive_line, parse_quoted_value, split_lines
3+
from .parser_utils import MigrationParseError, is_blank, parse_directive_line, parse_quoted_value, read_directive_block, split_lines
44
from .types import ConnectionModel, GCSConnectionModel, KafkaConnectionModel, ResourceFile, S3ConnectionModel
55

6+
CONNECTION_DIRECTIVES = {
7+
"TYPE",
8+
"KAFKA_BOOTSTRAP_SERVERS",
9+
"KAFKA_SECURITY_PROTOCOL",
10+
"KAFKA_SASL_MECHANISM",
11+
"KAFKA_KEY",
12+
"KAFKA_SECRET",
13+
"KAFKA_SCHEMA_REGISTRY_URL",
14+
"KAFKA_SSL_CA_PEM",
15+
"S3_REGION",
16+
"S3_ARN",
17+
"S3_ACCESS_KEY",
18+
"S3_SECRET",
19+
"GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON",
20+
}
21+
22+
23+
def _is_connection_directive_line(line: str) -> bool:
24+
if not line:
25+
return False
26+
directive = parse_directive_line(line)
27+
return directive["key"] in CONNECTION_DIRECTIVES
28+
629

730
def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
831
lines = split_lines(resource.content)
@@ -22,9 +45,19 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
2245
access_secret: str | None = None
2346
service_account_credentials_json: str | None = None
2447

25-
for raw_line in lines:
48+
i = 0
49+
while i < len(lines):
50+
raw_line = lines[i]
2651
line = raw_line.strip()
52+
2753
if is_blank(line) or line.startswith("#"):
54+
i += 1
55+
continue
56+
57+
if line == "KAFKA_SSL_CA_PEM >":
58+
block, next_index = read_directive_block(lines, i + 1, _is_connection_directive_line)
59+
ssl_ca_pem = "\n".join(block)
60+
i = next_index
2861
continue
2962

3063
directive = parse_directive_line(line)
@@ -81,6 +114,8 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
81114
f'Unsupported connection directive in strict mode: "{line}"',
82115
)
83116

117+
i += 1
118+
84119
if not connection_type:
85120
raise MigrationParseError(resource.file_path, "connection", resource.name, "TYPE directive is required.")
86121

tests/test_phase1_schema_generator_parity.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,36 @@ def test_generate_connection_includes_kafka_schema_registry_url() -> None:
2020
assert "KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com" in generated.content
2121

2222

23+
def test_generate_connection_emits_multiline_ssl_ca_pem() -> None:
24+
pem = "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJAM\n-----END CERTIFICATE-----"
25+
kafka = define_kafka_connection(
26+
"broker",
27+
{
28+
"bootstrap_servers": "localhost:9092",
29+
"ssl_ca_pem": pem,
30+
},
31+
)
32+
33+
generated = generate_connection(kafka)
34+
assert "KAFKA_SSL_CA_PEM >" in generated.content
35+
assert " -----BEGIN CERTIFICATE-----" in generated.content
36+
assert " MIIDXTCCAkWgAwIBAgIJAM" in generated.content
37+
assert " -----END CERTIFICATE-----" in generated.content
38+
39+
40+
def test_generate_connection_emits_single_line_ssl_ca_pem() -> None:
41+
kafka = define_kafka_connection(
42+
"broker",
43+
{
44+
"bootstrap_servers": "localhost:9092",
45+
"ssl_ca_pem": "{{ tb_secret('KAFKA_SSL_CA_PEM') }}",
46+
},
47+
)
48+
49+
generated = generate_connection(kafka)
50+
assert "KAFKA_SSL_CA_PEM {{ tb_secret('KAFKA_SSL_CA_PEM') }}" in generated.content
51+
52+
2353
def test_generate_datasource_includes_indexes_and_store_raw_value() -> None:
2454
kafka = define_kafka_connection("broker", {"bootstrap_servers": "localhost:9092"})
2555
datasource = define_datasource(

tests/test_phase3_migrate_parser_parity.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,72 @@ def test_parse_connection_supports_single_quoted_type_and_schema_registry() -> N
4444
assert parsed.schema_registry_url == "https://registry.example.com"
4545

4646

47+
def test_parse_connection_supports_multiline_ssl_ca_pem() -> None:
48+
parsed = parse_connection_file(
49+
_resource(
50+
"connection",
51+
"broker",
52+
"\n".join(
53+
[
54+
"TYPE kafka",
55+
"KAFKA_BOOTSTRAP_SERVERS localhost:9092",
56+
"KAFKA_SECURITY_PROTOCOL SASL_SSL",
57+
"KAFKA_SSL_CA_PEM >",
58+
" -----BEGIN CERTIFICATE-----",
59+
" MIIDXTCCAkWgAwIBAgIJAM",
60+
" -----END CERTIFICATE-----",
61+
]
62+
),
63+
)
64+
)
65+
66+
assert parsed.connection_type == "kafka"
67+
assert parsed.ssl_ca_pem == "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJAM\n-----END CERTIFICATE-----"
68+
69+
70+
def test_parse_connection_supports_multiline_ssl_ca_pem_with_following_directives() -> None:
71+
parsed = parse_connection_file(
72+
_resource(
73+
"connection",
74+
"broker",
75+
"\n".join(
76+
[
77+
"TYPE kafka",
78+
"KAFKA_BOOTSTRAP_SERVERS localhost:9092",
79+
"KAFKA_SSL_CA_PEM >",
80+
" -----BEGIN CERTIFICATE-----",
81+
" MIIDXTCCAkWgAwIBAgIJAM",
82+
" -----END CERTIFICATE-----",
83+
"KAFKA_SECURITY_PROTOCOL SASL_SSL",
84+
"KAFKA_KEY mykey",
85+
]
86+
),
87+
)
88+
)
89+
90+
assert parsed.ssl_ca_pem == "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJAM\n-----END CERTIFICATE-----"
91+
assert parsed.security_protocol == "SASL_SSL"
92+
assert parsed.key == "mykey"
93+
94+
95+
def test_parse_connection_supports_single_line_ssl_ca_pem() -> None:
96+
parsed = parse_connection_file(
97+
_resource(
98+
"connection",
99+
"broker",
100+
"\n".join(
101+
[
102+
"TYPE kafka",
103+
"KAFKA_BOOTSTRAP_SERVERS localhost:9092",
104+
"KAFKA_SSL_CA_PEM {{ tb_secret('KAFKA_SSL_CA_PEM') }}",
105+
]
106+
),
107+
)
108+
)
109+
110+
assert parsed.ssl_ca_pem == "{{ tb_secret('KAFKA_SSL_CA_PEM') }}"
111+
112+
47113
def test_parse_datasource_supports_engine_is_deleted_and_kafka_store_raw_value() -> None:
48114
parsed = parse_datasource_file(
49115
_resource(

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)