Skip to content

Commit 19fd810

Browse files
committed
Put ListenerQueue in separate file
1 parent 7cada51 commit 19fd810

4 files changed

Lines changed: 66 additions & 111 deletions

File tree

gql/transport/aiohttp_websockets.py

Lines changed: 4 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,72 +22,19 @@
2222
from graphql import DocumentNode, ExecutionResult, print_ast
2323
from multidict import CIMultiDictProxy
2424

25-
from gql.transport.aiohttp import AIOHTTPTransport
26-
from gql.transport.async_transport import AsyncTransport
27-
from gql.transport.exceptions import (
25+
from .aiohttp import AIOHTTPTransport
26+
from .async_transport import AsyncTransport
27+
from .exceptions import (
2828
TransportAlreadyConnected,
2929
TransportClosed,
3030
TransportProtocolError,
3131
TransportQueryError,
3232
TransportServerError,
3333
)
34+
from .websockets_common import ListenerQueue
3435

3536
log = logging.getLogger("gql.transport.aiohttp_websockets")
3637

37-
ParsedAnswer = Tuple[str, Optional[ExecutionResult]]
38-
39-
40-
class ListenerQueue:
41-
"""Special queue used for each query waiting for server answers
42-
43-
If the server is stopped while the listener is still waiting,
44-
Then we send an exception to the queue and this exception will be raised
45-
to the consumer once all the previous messages have been consumed from the queue
46-
"""
47-
48-
def __init__(self, query_id: int, send_stop: bool) -> None:
49-
self.query_id: int = query_id
50-
self.send_stop: bool = send_stop
51-
self._queue: asyncio.Queue = asyncio.Queue()
52-
self._closed: bool = False
53-
54-
async def get(self) -> ParsedAnswer:
55-
56-
try:
57-
item = self._queue.get_nowait()
58-
except asyncio.QueueEmpty:
59-
item = await self._queue.get()
60-
61-
self._queue.task_done()
62-
63-
# If we receive an exception when reading the queue, we raise it
64-
if isinstance(item, Exception):
65-
self._closed = True
66-
raise item
67-
68-
# Don't need to save new answers or
69-
# send the stop message if we already received the complete message
70-
answer_type, execution_result = item
71-
if answer_type == "complete":
72-
self.send_stop = False
73-
self._closed = True
74-
75-
return item
76-
77-
async def put(self, item: ParsedAnswer) -> None:
78-
79-
if not self._closed:
80-
await self._queue.put(item)
81-
82-
async def set_exception(self, exception: Exception) -> None:
83-
84-
# Put the exception in the queue
85-
await self._queue.put(exception)
86-
87-
# Don't need to send stop messages in case of error
88-
self.send_stop = False
89-
self._closed = True
90-
9138

9239
class AIOHTTPWebsocketsTransport(AsyncTransport):
9340

gql/transport/websockets_base.py

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,63 +21,10 @@
2121
TransportQueryError,
2222
TransportServerError,
2323
)
24+
from .websockets_common import ListenerQueue
2425

2526
log = logging.getLogger("gql.transport.websockets")
2627

27-
ParsedAnswer = Tuple[str, Optional[ExecutionResult]]
28-
29-
30-
class ListenerQueue:
31-
"""Special queue used for each query waiting for server answers
32-
33-
If the server is stopped while the listener is still waiting,
34-
Then we send an exception to the queue and this exception will be raised
35-
to the consumer once all the previous messages have been consumed from the queue
36-
"""
37-
38-
def __init__(self, query_id: int, send_stop: bool) -> None:
39-
self.query_id: int = query_id
40-
self.send_stop: bool = send_stop
41-
self._queue: asyncio.Queue = asyncio.Queue()
42-
self._closed: bool = False
43-
44-
async def get(self) -> ParsedAnswer:
45-
46-
try:
47-
item = self._queue.get_nowait()
48-
except asyncio.QueueEmpty:
49-
item = await self._queue.get()
50-
51-
self._queue.task_done()
52-
53-
# If we receive an exception when reading the queue, we raise it
54-
if isinstance(item, Exception):
55-
self._closed = True
56-
raise item
57-
58-
# Don't need to save new answers or
59-
# send the stop message if we already received the complete message
60-
answer_type, execution_result = item
61-
if answer_type == "complete":
62-
self.send_stop = False
63-
self._closed = True
64-
65-
return item
66-
67-
async def put(self, item: ParsedAnswer) -> None:
68-
69-
if not self._closed:
70-
await self._queue.put(item)
71-
72-
async def set_exception(self, exception: Exception) -> None:
73-
74-
# Put the exception in the queue
75-
await self._queue.put(exception)
76-
77-
# Don't need to send stop messages in case of error
78-
self.send_stop = False
79-
self._closed = True
80-
8128

8229
class WebsocketsTransportBase(AsyncTransport):
8330
"""abstract :ref:`Async Transport <async_transports>` used to implement
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .listener_queue import ListenerQueue, ParsedAnswer
2+
3+
__all__ = ["ListenerQueue", "ParsedAnswer"]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
from typing import Optional, Tuple
3+
4+
from graphql import ExecutionResult
5+
6+
ParsedAnswer = Tuple[str, Optional[ExecutionResult]]
7+
8+
9+
class ListenerQueue:
10+
"""Special queue used for each query waiting for server answers
11+
12+
If the server is stopped while the listener is still waiting,
13+
Then we send an exception to the queue and this exception will be raised
14+
to the consumer once all the previous messages have been consumed from the queue
15+
"""
16+
17+
def __init__(self, query_id: int, send_stop: bool) -> None:
18+
self.query_id: int = query_id
19+
self.send_stop: bool = send_stop
20+
self._queue: asyncio.Queue = asyncio.Queue()
21+
self._closed: bool = False
22+
23+
async def get(self) -> ParsedAnswer:
24+
25+
try:
26+
item = self._queue.get_nowait()
27+
except asyncio.QueueEmpty:
28+
item = await self._queue.get()
29+
30+
self._queue.task_done()
31+
32+
# If we receive an exception when reading the queue, we raise it
33+
if isinstance(item, Exception):
34+
self._closed = True
35+
raise item
36+
37+
# Don't need to save new answers or
38+
# send the stop message if we already received the complete message
39+
answer_type, execution_result = item
40+
if answer_type == "complete":
41+
self.send_stop = False
42+
self._closed = True
43+
44+
return item
45+
46+
async def put(self, item: ParsedAnswer) -> None:
47+
48+
if not self._closed:
49+
await self._queue.put(item)
50+
51+
async def set_exception(self, exception: Exception) -> None:
52+
53+
# Put the exception in the queue
54+
await self._queue.put(exception)
55+
56+
# Don't need to send stop messages in case of error
57+
self.send_stop = False
58+
self._closed = True

0 commit comments

Comments
 (0)