Skip to content

Commit f0d1453

Browse files
author
Kyle Weaver
committed
[BEAM-9945] Ensure that the read index represents the number of fully processed elements including at the end of the channel or after splitting.
1 parent 605f15e commit f0d1453

1 file changed

Lines changed: 9 additions & 2 deletions

File tree

sdks/python/apache_beam/runners/worker/bundle_processor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,14 @@ def __init__(self,
191191
self.windowed_coder)
192192
]
193193
self.splitting_lock = threading.Lock()
194+
self.index = -1
195+
self.stop = float('inf')
194196
self.started = False
195197

196198
def start(self):
197199
# type: () -> None
198200
super(DataInputOperation, self).start()
199201
with self.splitting_lock:
200-
self.index = -1
201-
self.stop = float('inf')
202202
self.started = True
203203

204204
def process(self, windowed_value):
@@ -317,8 +317,15 @@ def is_valid_split_point(index):
317317
def finish(self):
318318
# type: () -> None
319319
with self.splitting_lock:
320+
self.index += 1
320321
self.started = False
321322

323+
def reset(self):
324+
# type: () -> None
325+
self.index = -1
326+
self.stop = float('inf')
327+
super(DataInputOperation, self).reset()
328+
322329

323330
class _StateBackedIterable(object):
324331
def __init__(self,

0 commit comments

Comments
 (0)