-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathtest_async_tx.py
More file actions
161 lines (139 loc) · 5.61 KB
/
test_async_tx.py
File metadata and controls
161 lines (139 loc) · 5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#
# This source file is part of the EdgeDB open source project.
#
# Copyright 2016-present MagicStack Inc. and the EdgeDB authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import itertools
import edgedb
from edgedb import _testbase as tb
from edgedb import TransactionOptions
from edgedb.options import RetryOptions
class TestAsyncTx(tb.AsyncQueryTestCase):
SETUP = '''
CREATE TYPE test::TransactionTest EXTENDING std::Object {
CREATE PROPERTY name -> std::str;
};
'''
TEARDOWN_METHOD = '''
DELETE test::TransactionTest;
'''
TEARDOWN = '''
DROP TYPE test::TransactionTest;
'''
async def test_async_transaction_regular_01(self):
tr = self.client.with_retry_options(
RetryOptions(attempts=1)).transaction()
with self.assertRaises(ZeroDivisionError):
async for with_tr in tr:
async with with_tr:
await with_tr.execute('''
INSERT test::TransactionTest {
name := 'Test Transaction'
};
''')
1 / 0
result = await self.client.query('''
SELECT
test::TransactionTest
FILTER
test::TransactionTest.name = 'Test Transaction';
''')
self.assertEqual(result, [])
async def test_async_transaction_kinds(self):
isolations = [
None,
edgedb.IsolationLevel.Serializable,
]
booleans = [None, True, False]
all = itertools.product(isolations, booleans, booleans)
for isolation, readonly, deferrable in all:
opt = dict(
isolation=isolation,
readonly=readonly,
deferrable=deferrable,
)
# skip None
opt = {k: v for k, v in opt.items() if v is not None}
client = self.client.with_transaction_options(
TransactionOptions(**opt)
)
async for tx in client.transaction():
async with tx:
pass
async def test_async_transaction_commit_failure(self):
with self.assertRaises(edgedb.errors.QueryError):
async for tx in self.client.transaction():
async with tx:
await tx.execute("start migration to {};")
self.assertEqual(await self.client.query_single("select 42"), 42)
async def test_async_transaction_exclusive(self):
async for tx in self.client.transaction():
async with tx:
query = "select sys::_sleep(0.01)"
f1 = self.loop.create_task(tx.execute(query))
f2 = self.loop.create_task(tx.execute(query))
with self.assertRaisesRegex(
edgedb.InterfaceError,
"concurrent queries within the same transaction "
"are not allowed"
):
await asyncio.wait_for(f1, timeout=5)
await asyncio.wait_for(f2, timeout=5)
async def test_async_transaction_savepoint_1(self):
async for tx in self.client.transaction():
async with tx:
sp1 = await tx.declare_savepoint("sp1")
sp2 = await tx.declare_savepoint("sp2")
with self.assertRaisesRegex(
edgedb.InterfaceError, "savepoint.*already exists"
):
await tx.declare_savepoint("sp1")
await tx.execute('''
INSERT test::TransactionTest { name := '1' };
''')
await sp2.release()
with self.assertRaisesRegex(
edgedb.InterfaceError, "savepoint.*is no longer active"
):
await sp2.release()
await sp1.release()
result = await self.client.query('SELECT test::TransactionTest.name')
self.assertEqual(result, ["1"])
async def test_async_transaction_savepoint_2(self):
async for tx in self.client.transaction():
async with tx:
await tx.execute('''
INSERT test::TransactionTest { name := '1' };
''')
sp1 = await tx.declare_savepoint("sp1")
await tx.execute('''
INSERT test::TransactionTest { name := '2' };
''')
sp2 = await tx.declare_savepoint("sp2")
await tx.execute('''
INSERT test::TransactionTest { name := '3' };
''')
await sp1.rollback()
with self.assertRaisesRegex(
edgedb.InterfaceError, "savepoint.*is no longer active"
):
await sp1.rollback()
with self.assertRaisesRegex(
edgedb.InterfaceError, "savepoint.*is no longer active"
):
await sp2.rollback()
result = await self.client.query('SELECT test::TransactionTest.name')
self.assertEqual(result, ["1"])