Skip to content

Commit 3fb0ff6

Browse files
danthevihji
authored andcommitted
[BEAM-13599] Fix overflow error in Python Datastore RampupThrottlingFn
1 parent d5e75c5 commit 3fb0ff6

2 files changed

Lines changed: 21 additions & 1 deletion

File tree

sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ def _calc_max_ops_budget(
7171
growth = max(
7272
0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) /
7373
self._RAMP_UP_INTERVAL)
74-
max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))
74+
try:
75+
max_ops_budget = int(
76+
self._BASE_BUDGET / self._num_workers * (1.5**growth))
77+
except OverflowError:
78+
max_ops_budget = float('inf')
7579
return max(1, max_ops_budget)
7680

7781
def process(self, element, **kwargs):

sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn_test.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#
1717

1818
import datetime
19+
import math
20+
import sys
1921
import unittest
2022

2123
from mock import patch
@@ -57,6 +59,20 @@ def test_rampup_throttling(self, mock_sleep, mock_datetime):
5759
with self.assertRaises(_RampupDelayException):
5860
next(throttling_fn.process(None))
5961

62+
def test_budget_overflow(self):
63+
throttling_fn = RampupThrottlingFn(num_workers=1)
64+
65+
normal_date = DATE_ZERO + datetime.timedelta(minutes=2000)
66+
normal_budget = throttling_fn._calc_max_ops_budget(DATE_ZERO, normal_date)
67+
self.assertNotEqual(normal_budget, float('inf'))
68+
69+
# This tests that a previously thrown OverflowError is caught.
70+
overflow_minutes = math.log(sys.float_info.max) / math.log(1.5) * 5
71+
overflow_date = DATE_ZERO + datetime.timedelta(minutes=overflow_minutes)
72+
overflow_budget = throttling_fn._calc_max_ops_budget(
73+
DATE_ZERO, overflow_date)
74+
self.assertEqual(overflow_budget, float('inf'))
75+
6076

6177
if __name__ == '__main__':
6278
unittest.main()

0 commit comments

Comments
 (0)