Skip to content

Commit 9b99326

Browse files
committed
Add simple examples
Changes ======= * Cluster/Scope examples added for both sync and async APIs * Backoff exponent set to 2 (changed from 1.5) * Cleaned up test suite test server request/response functionality
1 parent 0d2c08f commit 9b99326

13 files changed

Lines changed: 546 additions & 79 deletions

File tree

acouchbase_analytics/protocol/_core/request_context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,9 @@ def start_stream(self, core_response: HttpCoreResponse) -> None:
424424
# TODO: logging; I don't think this is an error...
425425
return
426426

427-
self._json_stream = AsyncJsonStream(core_response.aiter_bytes(), stream_config=self._stream_config)
427+
self._json_stream = AsyncJsonStream(
428+
core_response.aiter_bytes(), stream_config=self._stream_config, logger_handler=self.log_message
429+
)
428430
self._start_next_stage(self._json_stream.start_parsing)
429431

430432
async def wait_for_results_or_errors(self) -> None:

couchbase_analytics/_version.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# This file automatically generated by
2-
# /Users/jaredcasey/GIT/couchbase/clients/python/analytics-python-client/couchbase_analytics_version.py
2+
# /Users/jaredcasey/GIT/couchbase/clients/python/analytics-python-client/./couchbase_analytics_version.py
33
# at
4-
# 2025-07-16 15:02:22.211821
5-
__version__ = '1.0.0.dev1'
4+
# 2025-07-21 18:36:10.973732
5+
__version__ = '0.0.1'

couchbase_analytics/common/backoff_calculator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def calculate_backoff(self, retry_count: int) -> float:
2727
class DefaultBackoffCalculator(BackoffCalculator):
2828
MIN = 100
2929
MAX = 60 * 1000
30-
EXPONENT_BASE = 1.5
30+
EXPONENT_BASE = 2
3131

3232
def __init__(
3333
self, min: Optional[int] = None, max: Optional[int] = None, exponent_base: Optional[int] = None

couchbase_analytics/common/logging.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,29 @@
1616
import logging
1717
from enum import Enum
1818

19-
LOG_FORMAT_ARR = ['[%(asctime)s.%(msecs)03d]',
20-
'%(relativeCreated)dms',
21-
'[%(levelname)s]',
22-
'[%(process)d, %(threadName)s (%(thread)d)]'
23-
' %(name)s',
24-
'- %(message)s']
19+
LOG_FORMAT_ARR = [
20+
'[%(asctime)s.%(msecs)03d]',
21+
'%(relativeCreated)dms',
22+
'[%(levelname)s]',
23+
'[%(process)d, %(threadName)s (%(thread)d)] %(name)s',
24+
'- %(message)s',
25+
]
2526
LOG_FORMAT = ' '.join(LOG_FORMAT_ARR)
2627
LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
2728

29+
2830
class LogLevel(Enum):
2931
DEBUG = logging.DEBUG
3032
INFO = logging.INFO
3133
WARNING = logging.WARNING
3234
ERROR = logging.ERROR
3335
CRITICAL = logging.CRITICAL
3436

37+
3538
def log_message(logger: logging.Logger, message: str, log_level: LogLevel) -> None:
3639
if not logger or not logger.hasHandlers():
3740
return
38-
41+
3942
if log_level == LogLevel.DEBUG:
4043
logger.debug(message)
4144
elif log_level == LogLevel.INFO:
@@ -45,4 +48,4 @@ def log_message(logger: logging.Logger, message: str, log_level: LogLevel) -> No
4548
elif log_level == LogLevel.ERROR:
4649
logger.error(message)
4750
elif log_level == LogLevel.CRITICAL:
48-
logger.critical(message)
51+
logger.critical(message)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
from datetime import timedelta
18+
19+
# NOTE: anyio is a dependency of acouchbase_analytics
20+
import anyio
21+
22+
from acouchbase_analytics.cluster import AsyncCluster
23+
from acouchbase_analytics.credential import Credential
24+
from acouchbase_analytics.options import ClusterOptions, QueryOptions, TimeoutOptions
25+
26+
27+
async def main() -> None:
28+
# Update this to your cluster
29+
endpoint = 'https://--your-instance--'
30+
username = 'username'
31+
pw = 'Password!123'
32+
# User Input ends here.
33+
34+
cred = Credential.from_username_and_password(username, pw)
35+
# NOTE: Only an example on how to use options. Not a recommendation.
36+
timeout_opts = TimeoutOptions(query_timeout=timedelta(seconds=30))
37+
cluster = AsyncCluster.create_instance(endpoint, cred, ClusterOptions(timeout_options=timeout_opts))
38+
39+
# Execute a query and buffer all result rows in client memory.
40+
statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
41+
res = await cluster.execute_query(statement)
42+
all_rows = await res.get_all_rows()
43+
# NOTE: all_rows is a list, _do not_ use `async for`
44+
for row in all_rows:
45+
print(f'Found row: {row}')
46+
print(f'metadata={res.metadata()}')
47+
48+
# Execute a query and process rows as they arrive from server.
49+
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
50+
res = await cluster.execute_query(statement)
51+
async for row in res.rows():
52+
print(f'Found row: {row}')
53+
print(f'metadata={res.metadata()}')
54+
55+
# Execute a streaming query with positional arguments.
56+
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
57+
res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
58+
async for row in res:
59+
print(f'Found row: {row}')
60+
print(f'metadata={res.metadata()}')
61+
62+
# Execute a streaming query with named arguments.
63+
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
64+
res = await cluster.execute_query(
65+
statement, QueryOptions(named_parameters={'country': 'United States', 'limit': 10})
66+
)
67+
async for row in res.rows():
68+
print(f'Found row: {row}')
69+
print(f'metadata={res.metadata()}')
70+
71+
72+
if __name__ == '__main__':
73+
anyio.run(main)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import asyncio
17+
from datetime import timedelta
18+
19+
from acouchbase_analytics.cluster import AsyncCluster
20+
from acouchbase_analytics.credential import Credential
21+
from acouchbase_analytics.options import ClusterOptions, QueryOptions, TimeoutOptions
22+
23+
24+
async def main() -> None:
25+
# Update this to your cluster
26+
endpoint = 'https://--your-instance--'
27+
username = 'username'
28+
pw = 'Password!123'
29+
# User Input ends here.
30+
31+
cred = Credential.from_username_and_password(username, pw)
32+
# NOTE: Only an example on how to use options. Not a recommendation.
33+
timeout_opts = TimeoutOptions(query_timeout=timedelta(seconds=30))
34+
cluster = AsyncCluster.create_instance(endpoint, cred, ClusterOptions(timeout_options=timeout_opts))
35+
36+
# Execute a query and buffer all result rows in client memory.
37+
statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
38+
res = await cluster.execute_query(statement)
39+
all_rows = await res.get_all_rows()
40+
# NOTE: all_rows is a list, _do not_ use `async for`
41+
for row in all_rows:
42+
print(f'Found row: {row}')
43+
print(f'metadata={res.metadata()}')
44+
45+
# Execute a query and process rows as they arrive from server.
46+
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
47+
res = await cluster.execute_query(statement)
48+
async for row in res.rows():
49+
print(f'Found row: {row}')
50+
print(f'metadata={res.metadata()}')
51+
52+
# Execute a streaming query with positional arguments.
53+
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
54+
res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
55+
async for row in res:
56+
print(f'Found row: {row}')
57+
print(f'metadata={res.metadata()}')
58+
59+
# Execute a streaming query with named arguments.
60+
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
61+
res = await cluster.execute_query(
62+
statement, QueryOptions(named_parameters={'country': 'United States', 'limit': 10})
63+
)
64+
async for row in res.rows():
65+
print(f'Found row: {row}')
66+
print(f'metadata={res.metadata()}')
67+
68+
69+
if __name__ == '__main__':
70+
asyncio.run(main())
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
from datetime import timedelta
18+
19+
# NOTE: anyio is a dependency of acouchbase_analytics
20+
import anyio
21+
22+
from acouchbase_analytics.cluster import AsyncCluster
23+
from acouchbase_analytics.credential import Credential
24+
from acouchbase_analytics.options import ClusterOptions, QueryOptions, TimeoutOptions
25+
26+
27+
async def main() -> None:
28+
# Update this to your cluster
29+
endpoint = 'https://--your-instance--'
30+
username = 'username'
31+
pw = 'Password!123'
32+
# User Input ends here.
33+
34+
cred = Credential.from_username_and_password(username, pw)
35+
# NOTE: Only an example on how to use options. Not a recommendation.
36+
timeout_opts = TimeoutOptions(query_timeout=timedelta(seconds=30))
37+
opts = ClusterOptions(timeout_options=timeout_opts)
38+
scope = AsyncCluster.create_instance(endpoint, cred, opts).database('travel-sample').scope('inventory')
39+
40+
# Execute a scope-level query and buffer all result rows in client memory.
41+
statement = 'SELECT * FROM airline LIMIT 10;'
42+
res = await scope.execute_query(statement)
43+
all_rows = await res.get_all_rows()
44+
# NOTE: all_rows is a list, _do not_ use `async for`
45+
for row in all_rows:
46+
print(f'Found row: {row}')
47+
print(f'metadata={res.metadata()}')
48+
49+
# Execute a scope-level query and process rows as they arrive from server.
50+
statement = 'SELECT * FROM airline WHERE country="United States" LIMIT 10;'
51+
res = await scope.execute_query(statement)
52+
async for row in res.rows():
53+
print(f'Found row: {row}')
54+
print(f'metadata={res.metadata()}')
55+
56+
# Execute a streaming scope-level query with positional arguments.
57+
statement = 'SELECT * FROM airline WHERE country=$1 LIMIT $2;'
58+
res = await scope.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
59+
async for row in res:
60+
print(f'Found row: {row}')
61+
print(f'metadata={res.metadata()}')
62+
63+
# Execute a streaming scope-level query with named arguments.
64+
statement = 'SELECT * FROM airline WHERE country=$country LIMIT $limit;'
65+
res = await scope.execute_query(statement, QueryOptions(named_parameters={'country': 'United States', 'limit': 10}))
66+
async for row in res.rows():
67+
print(f'Found row: {row}')
68+
print(f'metadata={res.metadata()}')
69+
70+
71+
if __name__ == '__main__':
72+
anyio.run(main)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import asyncio
17+
from datetime import timedelta
18+
19+
from acouchbase_analytics.cluster import AsyncCluster
20+
from acouchbase_analytics.credential import Credential
21+
from acouchbase_analytics.options import ClusterOptions, QueryOptions, TimeoutOptions
22+
23+
24+
async def main() -> None:
25+
# Update this to your cluster
26+
endpoint = 'https://--your-instance--'
27+
username = 'username'
28+
pw = 'Password!123'
29+
# User Input ends here.
30+
31+
cred = Credential.from_username_and_password(username, pw)
32+
# NOTE: Only an example on how to use options. Not a recommendation.
33+
timeout_opts = TimeoutOptions(query_timeout=timedelta(seconds=30))
34+
opts = ClusterOptions(timeout_options=timeout_opts)
35+
scope = AsyncCluster.create_instance(endpoint, cred, opts).database('travel-sample').scope('inventory')
36+
37+
# Execute a scope-level query and buffer all result rows in client memory.
38+
statement = 'SELECT * FROM airline LIMIT 10;'
39+
res = await scope.execute_query(statement)
40+
all_rows = await res.get_all_rows()
41+
# NOTE: all_rows is a list, _do not_ use `async for`
42+
for row in all_rows:
43+
print(f'Found row: {row}')
44+
print(f'metadata={res.metadata()}')
45+
46+
# Execute a scope-level query and process rows as they arrive from server.
47+
statement = 'SELECT * FROM airline WHERE country="United States" LIMIT 10;'
48+
res = await scope.execute_query(statement)
49+
async for row in res.rows():
50+
print(f'Found row: {row}')
51+
print(f'metadata={res.metadata()}')
52+
53+
# Execute a streaming scope-level query with positional arguments.
54+
statement = 'SELECT * FROM airline WHERE country=$1 LIMIT $2;'
55+
res = await scope.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
56+
async for row in res:
57+
print(f'Found row: {row}')
58+
print(f'metadata={res.metadata()}')
59+
60+
# Execute a streaming scope-level query with named arguments.
61+
statement = 'SELECT * FROM airline WHERE country=$country LIMIT $limit;'
62+
res = await scope.execute_query(statement, QueryOptions(named_parameters={'country': 'United States', 'limit': 10}))
63+
async for row in res.rows():
64+
print(f'Found row: {row}')
65+
print(f'metadata={res.metadata()}')
66+
67+
68+
if __name__ == '__main__':
69+
asyncio.run(main())

0 commit comments

Comments
 (0)