Skip to content

Commit 8f387f0

Browse files
author
Kyle Weaver
authored
Merge pull request #11697 from ibzib/BEAM-9945
[BEAM-9945] [release-2.21.0] Ensure that the read index represents the number of fully…
2 parents 9df8055 + f0d1453 commit 8f387f0

1 file changed

Lines changed: 9 additions & 2 deletions

File tree

sdks/python/apache_beam/runners/worker/bundle_processor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,14 @@ def __init__(self,
191191
self.windowed_coder)
192192
]
193193
self.splitting_lock = threading.Lock()
194+
self.index = -1
195+
self.stop = float('inf')
194196
self.started = False
195197

196198
def start(self):
197199
# type: () -> None
198200
super(DataInputOperation, self).start()
199201
with self.splitting_lock:
200-
self.index = -1
201-
self.stop = float('inf')
202202
self.started = True
203203

204204
def process(self, windowed_value):
@@ -317,8 +317,15 @@ def is_valid_split_point(index):
317317
def finish(self):
318318
# type: () -> None
319319
with self.splitting_lock:
320+
self.index += 1
320321
self.started = False
321322

323+
def reset(self):
324+
# type: () -> None
325+
self.index = -1
326+
self.stop = float('inf')
327+
super(DataInputOperation, self).reset()
328+
322329

323330
class _StateBackedIterable(object):
324331
def __init__(self,

0 commit comments

Comments
 (0)