-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathtest_extension.py
More file actions
187 lines (159 loc) · 5.72 KB
/
test_extension.py
File metadata and controls
187 lines (159 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import boto3
import psycopg2
from localstack.utils.strings import short_uid
from localstack.utils.sync import retry
# Connection details for ParadeDB
# Connect through LocalStack gateway with TCP proxying
HOST = "paradedb.localhost.localstack.cloud"
PORT = 4566
USER = "myuser"
PASSWORD = "mypassword"
DATABASE = "mydatabase"
def get_connection():
"""Create a connection to ParadeDB, retrying until the server is ready."""
def _connect():
return psycopg2.connect(
host=HOST,
port=PORT,
user=USER,
password=PASSWORD,
database=DATABASE,
)
return retry(_connect, retries=15, sleep=2.0)
def test_connect_to_paradedb():
"""Test basic connection to ParadeDB."""
conn = get_connection()
cursor = conn.cursor()
# Check PostgreSQL version
cursor.execute("SELECT version();")
version = cursor.fetchone()[0]
assert "PostgreSQL" in version
cursor.close()
conn.close()
def test_paradedb_quickstart():
"""Test some of ParadeDB's quickstart examples."""
conn = get_connection()
cursor = conn.cursor()
table_name = f"mock_items_{short_uid()}"
index_name = f"{table_name}_idx"
try:
# Load sample data
cursor.execute(f"""
CALL paradedb.create_bm25_test_table(
schema_name => 'public',
table_name => '{table_name}'
);
""")
# Create search index
cursor.execute(f"""
CREATE INDEX search_idx ON {table_name}
USING bm25 (id, description, category, rating, in_stock, created_at, metadata, weight_range)
WITH (key_field='id');
""")
cursor.execute(f"""
SELECT description, rating, category
FROM {table_name}
LIMIT 3;
""")
results = cursor.fetchall()
assert results == [
("Ergonomic metal keyboard", 4, "Electronics"),
("Plastic Keyboard", 4, "Electronics"),
("Sleek running shoes", 5, "Footwear"),
]
# Match conjunction
cursor.execute(f"""
SELECT description, rating, category
FROM {table_name}
WHERE description &&& 'running shoes' AND rating > 2
ORDER BY rating
LIMIT 5;
""")
results = cursor.fetchall()
assert results == [("Sleek running shoes", 5, "Footwear")]
# BM25 scoring
cursor.execute(f"""
SELECT description, pdb.score(id)
FROM {table_name}
WHERE description ||| 'running shoes' AND rating > 2
ORDER BY score DESC
LIMIT 5;
""")
results = cursor.fetchall()
assert results == [
("Sleek running shoes", 6.817111),
("Generic shoes", 3.8772602),
("White jogging shoes", 3.4849067),
]
finally:
# Cleanup - drop index first, then table
cursor.execute(f"DROP INDEX IF EXISTS {index_name};")
cursor.execute(f"DROP TABLE IF EXISTS {table_name};")
conn.commit()
cursor.close()
conn.close()
def test_mixed_tcp_and_http_traffic():
"""
Test that mixed TCP (ParadeDB) and HTTP (AWS) traffic works correctly.
This verifies that the ParadeDB extension only intercepts PostgreSQL wire
protocol connections and doesn't interfere with regular HTTP-based AWS
API requests to LocalStack.
"""
# First, verify ParadeDB TCP connection works
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1 as test_value;")
result = cursor.fetchone()
assert result[0] == 1, "ParadeDB TCP connection should work"
cursor.close()
conn.close()
# Now verify AWS HTTP requests still work (S3 and STS)
# These should NOT be intercepted by the ParadeDB extension
endpoint_url = f"http://localhost:{PORT}"
# Test S3 HTTP requests
s3_client = boto3.client(
"s3",
endpoint_url=endpoint_url,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
bucket_name = f"test-bucket-{short_uid()}"
s3_client.create_bucket(Bucket=bucket_name)
# List buckets to verify HTTP API is working
buckets = s3_client.list_buckets()
bucket_names = [b["Name"] for b in buckets["Buckets"]]
assert bucket_name in bucket_names, "S3 HTTP API should work alongside ParadeDB TCP"
# Put and get an object
test_key = "test-object.txt"
test_content = b"Hello from mixed TCP/HTTP test!"
s3_client.put_object(Bucket=bucket_name, Key=test_key, Body=test_content)
response = s3_client.get_object(Bucket=bucket_name, Key=test_key)
retrieved_content = response["Body"].read()
assert retrieved_content == test_content, "S3 object operations should work"
# Clean up S3
s3_client.delete_object(Bucket=bucket_name, Key=test_key)
s3_client.delete_bucket(Bucket=bucket_name)
# Test STS HTTP requests
sts_client = boto3.client(
"sts",
endpoint_url=endpoint_url,
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
)
caller_identity = sts_client.get_caller_identity()
assert "Account" in caller_identity, (
"STS HTTP API should work alongside ParadeDB TCP"
)
assert "Arn" in caller_identity, "STS should return valid caller identity"
# Finally, verify ParadeDB still works after HTTP requests
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 'tcp_works_after_http' as verification;")
result = cursor.fetchone()
assert result[0] == "tcp_works_after_http", (
"ParadeDB should still work after HTTP requests"
)
cursor.close()
conn.close()