Skip to content

Commit 605f15e

Browse files
author
Kyle Weaver
committed
[BEAM-9945] [release-2.21.0] Report data channel progress via a designated counter.
1 parent 9969b94 commit 605f15e

3 files changed

Lines changed: 29 additions & 1 deletion

File tree

model/pipeline/src/main/proto/metrics.proto

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,21 @@ message MonitoringInfoSpecs {
292292
value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element."
293293
}]
294294
}];
295+
296+
// The (0-based) index of the latest item processed from the data channel.
297+
// This gives an indication of the SDKs progress through the data channel,
298+
// and is a lower bound on where it is able to split.
299+
// For an SDK that processes items sequentially, this is equivalently the
300+
// number of items fully processed (or -1 if processing has not yet started).
301+
DATA_CHANNEL_READ_INDEX = 18 [(monitoring_info_spec) = {
302+
urn: "beam:metric:data_channel:read_index:v1",
303+
type: "beam:metrics:sum_int64:v1",
304+
required_labels: [ "PTRANSFORM" ],
305+
annotations: [{
306+
key: "description",
307+
value: "The read index of the data channel."
308+
}]
309+
}];
295310
}
296311
}
297312

@@ -511,4 +526,3 @@ message MonitoringInfoTypeUrns {
511526
// repeated string column_names = 1;
512527
// repeated MonitoringRow row_data = 2;
513528
// }
514-

sdks/python/apache_beam/metrics/monitoring_infos.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
[USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN])
5757
WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn
5858
WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn
59+
DATA_CHANNEL_READ_INDEX = (
60+
common_urns.monitoring_info_specs.DATA_CHANNEL_READ_INDEX.spec.urn)
5961

6062
# TODO(ajamato): Implement the remaining types, i.e. Double types
6163
# Extrema types, etc. See:

sdks/python/apache_beam/runners/worker/bundle_processor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,18 @@ def process_encoded(self, encoded_windowed_values):
217217
input_stream, True)
218218
self.output(decoded_value)
219219

220+
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
221+
# type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
222+
all_monitoring_infos = super(DataInputOperation, self).monitoring_infos(
223+
transform_id, tag_to_pcollection_id)
224+
read_progress_info = monitoring_infos.int64_counter(
225+
monitoring_infos.DATA_CHANNEL_READ_INDEX,
226+
self.index,
227+
ptransform=transform_id)
228+
all_monitoring_infos[monitoring_infos.to_key(
229+
read_progress_info)] = read_progress_info
230+
return all_monitoring_infos
231+
220232
def try_split(
221233
self, fraction_of_remainder, total_buffer_size, allowed_split_points):
222234
# type: (...) -> Optional[Tuple[int, Optional[operations.SdfSplitResultsPrimary], Optional[operations.SdfSplitResultsResidual], int]]

0 commit comments

Comments
 (0)