|
5 | 5 | import signal |
6 | 6 | import sys |
7 | 7 | from datetime import datetime, timedelta, timezone |
8 | | -from unittest.mock import MagicMock |
| 8 | +from unittest.mock import MagicMock, patch |
9 | 9 |
|
10 | 10 | import msgpack |
11 | 11 | import pytest |
| 12 | +import redis.exceptions |
12 | 13 |
|
13 | 14 | from arq.connections import ArqRedis, RedisSettings |
14 | 15 | from arq.constants import abort_jobs_ss, default_queue_name, expires_extra_ms, health_check_key_suffix, job_key_prefix |
@@ -1024,3 +1025,79 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): |
1024 | 1025 | worker = worker(functions=[func(foobar)]) |
1025 | 1026 | assert worker.timezone is not None |
1026 | 1027 | assert worker.timezone == datetime.now().astimezone().tzinfo |
| 1028 | + |
| 1029 | + |
| 1030 | +@pytest.mark.parametrize( |
| 1031 | + 'exception_thrown', |
| 1032 | + [ |
| 1033 | + redis.exceptions.ConnectionError('Error while reading from host'), |
| 1034 | + redis.exceptions.TimeoutError('Timeout reading from host'), |
| 1035 | + ], |
| 1036 | +) |
| 1037 | +async def test_worker_retry(mocker, worker_retry, exception_thrown): |
| 1038 | + # Testing redis exceptions, with retry settings specified |
| 1039 | + worker = worker_retry(functions=[func(foobar)]) |
| 1040 | + |
| 1041 | + # patch db read_response to mimic connection exceptions |
| 1042 | + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) |
| 1043 | + |
| 1044 | + # baseline |
| 1045 | + await worker.main() |
| 1046 | + await worker._poll_iteration() |
| 1047 | + |
| 1048 | + # spy method handling call_with_retry failure |
| 1049 | + spy = mocker.spy(worker.pool, '_disconnect_raise') |
| 1050 | + |
| 1051 | + try: |
| 1052 | + # start patch |
| 1053 | + p.start() |
| 1054 | + |
| 1055 | + # assert exception thrown |
| 1056 | + with pytest.raises(type(exception_thrown)): |
| 1057 | + await worker._poll_iteration() |
| 1058 | + |
| 1059 | + # assert retry counts and no exception thrown during '_disconnect_raise' |
| 1060 | + assert spy.call_count == 4 # retries setting + 1 |
| 1061 | + assert spy.spy_exception is None |
| 1062 | + |
| 1063 | + finally: |
| 1064 | + # stop patch to allow worker cleanup |
| 1065 | + p.stop() |
| 1066 | + |
| 1067 | + |
| 1068 | +@pytest.mark.parametrize( |
| 1069 | + 'exception_thrown', |
| 1070 | + [ |
| 1071 | + redis.exceptions.ConnectionError('Error while reading from host'), |
| 1072 | + redis.exceptions.TimeoutError('Timeout reading from host'), |
| 1073 | + ], |
| 1074 | +) |
| 1075 | +async def test_worker_crash(mocker, worker, exception_thrown): |
| 1076 | + # Testing redis exceptions, no retry settings specified |
| 1077 | + worker = worker(functions=[func(foobar)]) |
| 1078 | + |
| 1079 | + # patch db read_response to mimic connection exceptions |
| 1080 | + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) |
| 1081 | + |
| 1082 | + # baseline |
| 1083 | + await worker.main() |
| 1084 | + await worker._poll_iteration() |
| 1085 | + |
| 1086 | + # spy method handling call_with_retry failure |
| 1087 | + spy = mocker.spy(worker.pool, '_disconnect_raise') |
| 1088 | + |
| 1089 | + try: |
| 1090 | + # start patch |
| 1091 | + p.start() |
| 1092 | + |
| 1093 | + # assert exception thrown |
| 1094 | + with pytest.raises(type(exception_thrown)): |
| 1095 | + await worker._poll_iteration() |
| 1096 | + |
| 1097 | + # assert no retry counts and exception thrown during '_disconnect_raise' |
| 1098 | + assert spy.call_count == 1 |
| 1099 | + assert spy.spy_exception == exception_thrown |
| 1100 | + |
| 1101 | + finally: |
| 1102 | + # stop patch to allow worker cleanup |
| 1103 | + p.stop() |
0 commit comments