Skip to content

Commit a707a11

Browse files
authored
Merge pull request #9682: [BEAM-8314] Add aggregation logic to beam_fn_api metric counter updat…
[BEAM-8314]: Add aggregation logic to beam_fn_api metric counter updates in StreamingDataflowWorker
2 parents 9e2e8bc + b5f5b81 commit a707a11

22 files changed

Lines changed: 647 additions & 21 deletions

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.beam.runners.core.SimpleDoFnRunner;
3131
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
3232
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
33+
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
3334
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
3435
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
3536
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
@@ -282,7 +283,7 @@ protected CounterUpdate createUpdate(boolean isCumulative, long value) {
282283
.setStructuredNameAndMetadata(
283284
new CounterStructuredNameAndMetadata()
284285
.setName(name)
285-
.setMetadata(new CounterMetadata().setKind("SUM")))
286+
.setMetadata(new CounterMetadata().setKind(Kind.SUM.toString())))
286287
.setCumulative(isCumulative)
287288
.setInteger(longToSplitInt(value));
288289
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public String toString() {
5252
/** Well-defined {@code kind} strings for use in {@link CounterUpdate} protos. */
5353
public enum Kind {
5454
DISTRIBUTION("DISTRIBUTION"),
55+
MEAN("MEAN"),
5556
SUM("SUM");
5657

5758
private final String kind;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
7979
import org.apache.beam.runners.dataflow.worker.counters.Counter;
8080
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
81+
import org.apache.beam.runners.dataflow.worker.counters.CounterUpdateAggregators;
8182
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
8283
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
8384
import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction;
@@ -129,6 +130,7 @@
129130
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
130131
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.TextFormat;
131132
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
133+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
132134
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
133135
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
134136
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
@@ -198,6 +200,8 @@ public class StreamingDataflowWorker {
198200
/** Maximum number of failure stacktraces to report in each update sent to backend. */
199201
private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
200202

203+
private final AtomicLong counterAggregationErrorCount = new AtomicLong();
204+
201205
/** Returns whether an exception was caused by a {@link OutOfMemoryError}. */
202206
private static boolean isOutOfMemoryError(Throwable t) {
203207
while (t != null) {
@@ -1891,6 +1895,8 @@ private void sendWorkerUpdatesToDataflowService(
18911895
counterUpdates.addAll(
18921896
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE));
18931897
if (hasExperiment(options, "beam_fn_api")) {
1898+
Map<Object, List<CounterUpdate>> fnApiCounters = new HashMap<>();
1899+
18941900
while (!this.pendingMonitoringInfos.isEmpty()) {
18951901
final CounterUpdate item = this.pendingMonitoringInfos.poll();
18961902

@@ -1900,16 +1906,49 @@ private void sendWorkerUpdatesToDataflowService(
19001906
// WorkItem.
19011907
if (item.getCumulative()) {
19021908
item.setCumulative(false);
1909+
// Group counterUpdates by counterUpdateKey so they can be aggregated before sending to
1910+
// dataflow service.
1911+
fnApiCounters
1912+
.computeIfAbsent(getCounterUpdateKey(item), k -> new ArrayList<>())
1913+
.add(item);
19031914
} else {
19041915
// In current world all counters coming from FnAPI are cumulative.
19051916
// This is a safety check in case new counter type appears in FnAPI.
19061917
throw new UnsupportedOperationException(
19071918
"FnApi counters are expected to provide cumulative values."
1908-
+ " Please, update convertion to delta logic"
1919+
+ " Please, update conversion to delta logic"
19091920
+ " if non-cumulative counter type is required.");
19101921
}
1922+
}
1923+
1924+
// Aggregates counterUpdates with same counterUpdateKey to single CounterUpdate if possible
1925+
// so we can avoid excessive I/Os for reporting to dataflow service.
1926+
for (List<CounterUpdate> counterUpdateList : fnApiCounters.values()) {
1927+
if (counterUpdateList.isEmpty()) {
1928+
continue;
1929+
}
1930+
List<CounterUpdate> aggregatedCounterUpdateList =
1931+
CounterUpdateAggregators.aggregate(counterUpdateList);
1932+
1933+
// Log a warning message if encountered enough non-aggregatable counter updates since this
1934+
// can lead to a severe performance penalty if dataflow service can not handle the
1935+
// updates.
1936+
if (aggregatedCounterUpdateList.size() > 10) {
1937+
CounterUpdate head = aggregatedCounterUpdateList.get(0);
1938+
this.counterAggregationErrorCount.getAndIncrement();
1939+
// log warning message only when error count is the power of 2 to avoid spamming.
1940+
if (this.counterAggregationErrorCount.get() > 10
1941+
&& Long.bitCount(this.counterAggregationErrorCount.get()) == 1) {
1942+
LOG.warn(
1943+
"Found non-aggregated counter updates of size {} with kind {}, this will likely "
1944+
+ "cause performance degradation and excessive GC if size is large.",
1945+
counterUpdateList.size(),
1946+
MoreObjects.firstNonNull(
1947+
head.getNameAndKind(), head.getStructuredNameAndMetadata()));
1948+
}
1949+
}
19111950

1912-
counterUpdates.add(item);
1951+
counterUpdates.addAll(aggregatedCounterUpdateList);
19131952
}
19141953
}
19151954
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.counters;
19+
20+
import com.google.api.services.dataflow.model.CounterUpdate;
21+
import java.util.List;
22+
23+
/**
24+
* CounterUpdateAggregator performs aggregation over a list of CounterUpdate and return combined
25+
* result.
26+
*/
27+
interface CounterUpdateAggregator {
28+
29+
/**
30+
* Implementation of aggregate function should provide logic to take the list of CounterUpdates
31+
* and return single combined CounterUpdate object. Reporting the aggregated result to Dataflow
32+
* should have same effect as reporting the elements in list individually to Dataflow.
33+
*
34+
* @param counterUpdates CounterUpdates to aggregate.
35+
* @return Aggregated CounterUpdate.
36+
*/
37+
CounterUpdate aggregate(List<CounterUpdate> counterUpdates);
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.counters;
19+
20+
import com.google.api.services.dataflow.model.CounterUpdate;
21+
import com.google.common.collect.ImmutableMap;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Map;
25+
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
26+
27+
public class CounterUpdateAggregators {
28+
29+
private static final Map<String, CounterUpdateAggregator> aggregators =
30+
ImmutableMap.of(
31+
Kind.SUM.toString(), new SumCounterUpdateAggregator(),
32+
Kind.MEAN.toString(), new MeanCounterUpdateAggregator(),
33+
Kind.DISTRIBUTION.toString(), new DistributionCounterUpdateAggregator());
34+
35+
private static String getCounterUpdateKind(CounterUpdate counterUpdate) {
36+
if (counterUpdate.getStructuredNameAndMetadata() != null
37+
&& counterUpdate.getStructuredNameAndMetadata().getMetadata() != null) {
38+
return counterUpdate.getStructuredNameAndMetadata().getMetadata().getKind();
39+
}
40+
if (counterUpdate.getNameAndKind() != null) {
41+
return counterUpdate.getNameAndKind().getKind();
42+
}
43+
throw new IllegalArgumentException(
44+
"CounterUpdate must have either StructuredNameAndMetadata or NameAndKind.");
45+
}
46+
47+
/**
48+
* Try to aggregate a List of CounterUpdates. The first CounterUpdate entry of the List will be
49+
* examined to identify the CounterUpdate kind with {@link #getCounterUpdateKind(CounterUpdate)}
50+
* and find the suitable {@link CounterUpdateAggregator}, if there is no suitable aggregator the
51+
* original list will be returned.
52+
*
53+
* <p>Note that this method assumes the CounterUpdate elements in this list has the same {@link
54+
* com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata
55+
* StructruredNameAndMetadata} or {@link com.google.api.services.dataflow.model.NameAndKind
56+
* NameAndKind}, also the value type should be the same across all the elements.
57+
*
58+
* @param counterUpdates List of CounterUpdate to be aggregated.
59+
* @return A singleton list of combined CounterUpdate if it is possible to aggregate the elements,
60+
* other wise return the original list.
61+
*/
62+
public static List<CounterUpdate> aggregate(List<CounterUpdate> counterUpdates) {
63+
if (counterUpdates == null || counterUpdates.isEmpty()) {
64+
return counterUpdates;
65+
}
66+
CounterUpdate first = counterUpdates.get(0);
67+
String kind = getCounterUpdateKind(first);
68+
if (aggregators.containsKey(kind)) {
69+
// Return list containing combined CounterUpdate
70+
return Collections.singletonList(aggregators.get(kind).aggregate(counterUpdates));
71+
}
72+
// not able to aggregate the counter updates.
73+
return counterUpdates;
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.counters;
19+
20+
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
21+
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.splitIntToLong;
22+
23+
import com.google.api.services.dataflow.model.CounterUpdate;
24+
import com.google.api.services.dataflow.model.DistributionUpdate;
25+
import java.util.List;
26+
27+
public class DistributionCounterUpdateAggregator implements CounterUpdateAggregator {
28+
29+
@Override
30+
public CounterUpdate aggregate(List<CounterUpdate> counterUpdates) {
31+
32+
if (counterUpdates == null || counterUpdates.isEmpty()) {
33+
return null;
34+
}
35+
if (counterUpdates.stream().anyMatch(c -> c.getDistribution() == null)) {
36+
throw new UnsupportedOperationException(
37+
"Aggregating DISTRIBUTION counter updates over non-distribution type is not implemented.");
38+
}
39+
CounterUpdate initial = counterUpdates.remove(0);
40+
return counterUpdates.stream()
41+
.reduce(
42+
initial,
43+
(first, second) ->
44+
first.setDistribution(
45+
new DistributionUpdate()
46+
.setCount(
47+
longToSplitInt(
48+
splitIntToLong(first.getDistribution().getCount())
49+
+ splitIntToLong(second.getDistribution().getCount())))
50+
.setMax(
51+
longToSplitInt(
52+
Math.max(
53+
splitIntToLong(first.getDistribution().getMax()),
54+
splitIntToLong(second.getDistribution().getMax()))))
55+
.setMin(
56+
longToSplitInt(
57+
Math.min(
58+
splitIntToLong(first.getDistribution().getMin()),
59+
splitIntToLong(second.getDistribution().getMin()))))
60+
.setSum(
61+
longToSplitInt(
62+
splitIntToLong(first.getDistribution().getSum())
63+
+ splitIntToLong(second.getDistribution().getSum())))));
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.counters;
19+
20+
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
21+
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.splitIntToLong;
22+
23+
import com.google.api.services.dataflow.model.CounterUpdate;
24+
import com.google.api.services.dataflow.model.IntegerMean;
25+
import java.util.List;
26+
27+
public class MeanCounterUpdateAggregator implements CounterUpdateAggregator {
28+
29+
@Override
30+
public CounterUpdate aggregate(List<CounterUpdate> counterUpdates) {
31+
if (counterUpdates == null || counterUpdates.isEmpty()) {
32+
return null;
33+
}
34+
if (counterUpdates.stream().anyMatch(c -> c.getIntegerMean() == null)) {
35+
throw new UnsupportedOperationException(
36+
"Aggregating MEAN counter updates over non-integerMean type is not implemented.");
37+
}
38+
39+
CounterUpdate initial = counterUpdates.remove(0);
40+
return counterUpdates.stream()
41+
.reduce(
42+
initial,
43+
(first, second) ->
44+
first.setIntegerMean(
45+
new IntegerMean()
46+
.setCount(
47+
longToSplitInt(
48+
splitIntToLong(first.getIntegerMean().getCount())
49+
+ splitIntToLong(second.getIntegerMean().getCount())))
50+
.setSum(
51+
longToSplitInt(
52+
splitIntToLong(first.getIntegerMean().getSum())
53+
+ splitIntToLong(second.getIntegerMean().getSum())))));
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.counters;
19+
20+
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
21+
import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.splitIntToLong;
22+
23+
import com.google.api.services.dataflow.model.CounterUpdate;
24+
import java.util.List;
25+
26+
public class SumCounterUpdateAggregator implements CounterUpdateAggregator {
27+
28+
@Override
29+
public CounterUpdate aggregate(List<CounterUpdate> counterUpdates) {
30+
if (counterUpdates == null || counterUpdates.isEmpty()) {
31+
return null;
32+
}
33+
if (counterUpdates.stream().anyMatch(c -> c.getInteger() == null)) {
34+
throw new UnsupportedOperationException(
35+
"Aggregating SUM counter updates over non-integer type is not implemented.");
36+
}
37+
38+
CounterUpdate initial = counterUpdates.remove(0);
39+
return counterUpdates.stream()
40+
.reduce(
41+
initial,
42+
(first, second) ->
43+
first.setInteger(
44+
longToSplitInt(
45+
splitIntToLong(first.getInteger()) + splitIntToLong(second.getInteger()))));
46+
}
47+
}

0 commit comments

Comments
 (0)