Skip to content

Commit e859735

Browse files
author
Kyle Weaver
authored
Merge pull request #11703 from ibzib/BEAM-9001
[BEAM-9001, BEAM-6327] Ensure that all transforms (except for require…
2 parents 8f387f0 + d87c0ab commit e859735

10 files changed

Lines changed: 106 additions & 70 deletions

File tree

model/pipeline/src/main/proto/beam_runner_api.proto

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ message PTransform {
151151
// details.
152152
FunctionSpec spec = 1;
153153

154-
// (Optional) if this node is a composite, a list of the ids of
155-
// transforms that it contains.
154+
// (Optional) A list of the ids of transforms that it contains.
155+
//
156+
// Primitive transforms are not allowed to specify this.
156157
repeated string subtransforms = 2;
157158

158159
// (Required) A map from local names of inputs (unique only with this map, and
@@ -184,9 +185,10 @@ message PTransform {
184185
// there is none, it may be omitted.
185186
repeated DisplayData display_data = 6;
186187

187-
// (Optional) Environment where the current PTransform should be executed in.
188-
// Runner that executes the pipeline may choose to override this if needed. If
189-
// not specified, environment will be decided by the runner.
188+
// Environment where the current PTransform should be executed in.
189+
//
190+
// Transforms that are required to be implemented by a runner must omit this.
191+
// All other transforms are required to specify this.
190192
string environment_id = 7;
191193
}
192194

@@ -227,12 +229,18 @@ message StandardPTransforms {
227229
// See https://beam.apache.org/documentation/programming-guide/#groupbykey
228230
// for additional details.
229231
//
232+
// Never defines an environment as the runner is required to implement this
233+
// transform.
234+
//
230235
// Payload: None
231236
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];
232237

233238
// A transform which produces a single empty byte array at the minimum
234239
// timestamp in the GlobalWindow.
235240
//
241+
// Never defines an environment as the runner is required to implement this
242+
// transform.
243+
//
236244
// Payload: None
237245
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];
238246

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050
import org.apache.beam.sdk.values.PValue;
5151
import org.apache.beam.sdk.values.TupleTag;
5252
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
53-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
5453
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
54+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
5555
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet;
5656
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
5757
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
@@ -74,6 +74,10 @@ public class PTransformTranslation {
7474
public static final String MAP_WINDOWS_TRANSFORM_URN = "beam:transform:map_windows:v1";
7575
public static final String MERGE_WINDOWS_TRANSFORM_URN = "beam:transform:merge_windows:v1";
7676

77+
// Required runner implemented transforms. These transforms should never specify an environment.
78+
public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
79+
ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);
80+
7781
// DeprecatedPrimitives
7882
/**
7983
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
@@ -350,10 +354,15 @@ public RunnerApi.PTransform translate(
350354

351355
// A composite transform is permitted to have a null spec. There are also some pseudo-
352356
// primitives not yet supported by the portability framework that have null specs
357+
String urn = "";
353358
if (spec != null) {
359+
urn = spec.getUrn();
354360
transformBuilder.setSpec(spec);
355361
}
356-
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
362+
363+
if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
364+
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
365+
}
357366
return transformBuilder.build();
358367
}
359368
}
@@ -367,11 +376,6 @@ private static class KnownTransformPayloadTranslator<T extends PTransform<?, ?>>
367376
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
368377
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
369378

370-
// TODO: BEAM-9001 - set environment ID in all transforms and allow runners to override.
371-
private static List<String> sdkTransformsWithEnvironment =
372-
ImmutableList.of(
373-
PAR_DO_TRANSFORM_URN, COMBINE_PER_KEY_TRANSFORM_URN, ASSIGN_WINDOWS_TRANSFORM_URN);
374-
375379
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
376380
loadTransformPayloadTranslators() {
377381
HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators =
@@ -423,14 +427,20 @@ public RunnerApi.PTransform translate(
423427
if (spec != null) {
424428
transformBuilder.setSpec(spec);
425429

426-
if (sdkTransformsWithEnvironment.contains(spec.getUrn())) {
427-
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
428-
} else if (spec.getUrn().equals(READ_TRANSFORM_URN)
429-
&& (appliedPTransform.getTransform().getClass() == Read.Bounded.class)) {
430-
// Only assigning environment to Bounded reads. Not assigning an environment to Unbounded
431-
// reads since they are a Runner translated transform, unless, in the future, we have an
432-
// adapter available for splittable DoFn.
433-
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
430+
// Required runner implemented transforms should not have an environment id.
431+
if (!RUNNER_IMPLEMENTED_TRANSFORMS.contains(spec.getUrn())) {
432+
// TODO(BEAM-9309): Remove existing hacks around deprecated READ transform.
433+
if (spec.getUrn().equals(READ_TRANSFORM_URN)) {
434+
// Only assigning environment to Bounded reads. Not assigning an environment to
435+
// Unbounded
436+
// reads since they are a Runner translated transform, unless, in the future, we have an
437+
// adapter available for splittable DoFn.
438+
if (appliedPTransform.getTransform().getClass() == Read.Bounded.class) {
439+
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
440+
}
441+
} else {
442+
transformBuilder.setEnvironmentId(components.getOnlyEnvironmentId());
443+
}
434444
}
435445
}
436446
return transformBuilder.build();

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,15 @@ private static void validateTransform(
208208
}
209209

210210
String urn = transform.getSpec().getUrn();
211+
if (PTransformTranslation.RUNNER_IMPLEMENTED_TRANSFORMS.contains(urn)) {
212+
checkArgument(
213+
transform.getEnvironmentId().isEmpty(),
214+
"Transform %s references environment %s when no environment should be specified since it is a required runner implemented transform %s.",
215+
id,
216+
transform.getEnvironmentId(),
217+
urn);
218+
}
219+
211220
if (VALIDATORS.containsKey(urn)) {
212221
try {
213222
VALIDATORS.get(urn).validate(id, transform, components, requirements);

runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java renamed to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,47 +23,52 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26-
// TODO(BEAM-6327): Remove the need for this.
27-
28-
/** PipelineTrimmer removes subcomponents of native transforms that shouldn't be fused. */
29-
public class PipelineTrimmer {
30-
private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class);
26+
/**
27+
* TrivialNativeTransformExpander is used to replace transforms with known URNs with their native
28+
* equivalent.
29+
*/
30+
public class TrivialNativeTransformExpander {
31+
private static final Logger LOG = LoggerFactory.getLogger(TrivialNativeTransformExpander.class);
3132

3233
/**
33-
* Remove subcomponents of native transforms that shouldn't be fused.
34+
* Replaces transforms with the known URN with a native equivalent stripping the environment and
35+
* removing any sub-transforms from the returned pipeline.
3436
*
3537
* @param pipeline the pipeline to be trimmed
3638
* @param knownUrns set of URNs for the runner's native transforms
3739
* @return the trimmed pipeline
3840
*/
39-
public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
41+
public static Pipeline forKnownUrns(Pipeline pipeline, Set<String> knownUrns) {
4042
return makeKnownUrnsPrimitives(pipeline, knownUrns);
4143
}
4244

4345
private static RunnerApi.Pipeline makeKnownUrnsPrimitives(
4446
RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
4547
RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
4648
for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
47-
if (knownUrns.contains(
48-
pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
49-
LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
49+
// Skip over previously removed transforms from the original pipeline since we iterate
50+
// over all transforms from the original pipeline and not the trimmed down version.
51+
RunnerApi.PTransform currentTransform =
52+
trimmedPipeline.getComponents().getTransformsOrDefault(ptransformId, null);
53+
if (currentTransform != null && knownUrns.contains(currentTransform.getSpec().getUrn())) {
54+
LOG.debug(
55+
"Removing descendants and environment of known native PTransform {}" + ptransformId);
5056
removeDescendants(trimmedPipeline, ptransformId);
57+
trimmedPipeline
58+
.getComponentsBuilder()
59+
.putTransforms(
60+
ptransformId,
61+
currentTransform.toBuilder().clearSubtransforms().clearEnvironmentId().build());
5162
}
5263
}
5364
return trimmedPipeline.build();
5465
}
5566

5667
private static void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
57-
RunnerApi.PTransform parentProto =
58-
pipeline.getComponents().getTransformsOrDefault(parentId, null);
59-
if (parentProto != null) {
60-
for (String childId : parentProto.getSubtransformsList()) {
61-
removeDescendants(pipeline, childId);
62-
pipeline.getComponentsBuilder().removeTransforms(childId);
63-
}
64-
pipeline
65-
.getComponentsBuilder()
66-
.putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
68+
RunnerApi.PTransform parentProto = pipeline.getComponents().getTransformsOrThrow(parentId);
69+
for (String childId : parentProto.getSubtransformsList()) {
70+
removeDescendants(pipeline, childId);
71+
pipeline.getComponentsBuilder().removeTransforms(childId);
6772
}
6873
}
6974
}

runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@
5050
import org.apache.beam.sdk.transforms.Create;
5151
import org.apache.beam.sdk.transforms.DoFn;
5252
import org.apache.beam.sdk.transforms.Flatten;
53+
import org.apache.beam.sdk.transforms.GroupByKey;
5354
import org.apache.beam.sdk.transforms.MapElements;
5455
import org.apache.beam.sdk.transforms.ParDo;
5556
import org.apache.beam.sdk.transforms.View;
57+
import org.apache.beam.sdk.transforms.WithKeys;
5658
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
5759
import org.apache.beam.sdk.transforms.windowing.Window;
5860
import org.apache.beam.sdk.values.PBegin;
@@ -342,15 +344,15 @@ public void getProducer() {
342344
public void getEnvironmentWithEnvironment() {
343345
Pipeline p = Pipeline.create();
344346
PCollection<Long> longs = p.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
345-
PCollectionList.of(longs).and(longs).and(longs).apply("flatten", Flatten.pCollections());
347+
longs.apply(WithKeys.of("a")).apply("groupByKey", GroupByKey.create());
346348

347349
Components components = PipelineTranslation.toProto(p).getComponents();
348350
QueryablePipeline qp = QueryablePipeline.forPrimitivesIn(components);
349351

350352
PTransformNode environmentalRead =
351353
PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead"));
352354
PTransformNode nonEnvironmentalTransform =
353-
PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten"));
355+
PipelineNode.pTransform("groupByKey", components.getTransformsOrThrow("groupByKey"));
354356

355357
assertThat(qp.getEnvironment(environmentalRead).isPresent(), is(true));
356358
assertThat(

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
3131
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
3232
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
33-
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
3433
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
3534
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
35+
import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
3636
import org.apache.beam.runners.core.metrics.MetricsPusher;
3737
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
3838
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
@@ -98,7 +98,8 @@ PortablePipelineResult runPipelineWithTranslator(
9898

9999
// Don't let the fuser fuse any subcomponents of native transforms.
100100
Pipeline trimmedPipeline =
101-
PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
101+
TrivialNativeTransformExpander.forKnownUrns(
102+
pipelineWithSdfExpanded, translator.knownUrns());
102103

103104
// Fused pipeline proto.
104105
// TODO: Consider supporting partially-fused graphs.

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ interface PTransformTranslator<T> {
228228

229229
@Override
230230
public Set<String> knownUrns() {
231-
// Do not expose Read as a known URN because PipelineTrimmer otherwise removes
231+
// Do not expose Read as a known URN because TrivialNativeTransformExpander otherwise removes
232232
// the subtransforms which are added in case of bounded reads. We only have a
233233
// translator here for unbounded Reads which are native transforms which do not
234234
// have subtransforms. Unbounded Reads are used by cross-language transforms, e.g.

runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
3131
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
3232
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
33-
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
3433
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
3534
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
35+
import org.apache.beam.runners.core.construction.graph.TrivialNativeTransformExpander;
3636
import org.apache.beam.runners.core.metrics.MetricsPusher;
3737
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineJarUtils;
3838
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
@@ -82,7 +82,8 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
8282

8383
// Don't let the fuser fuse any subcomponents of native transforms.
8484
Pipeline trimmedPipeline =
85-
PipelineTrimmer.trim(pipelineWithSdfExpanded, translator.knownUrns());
85+
TrivialNativeTransformExpander.forKnownUrns(
86+
pipelineWithSdfExpanded, translator.knownUrns());
8687

8788
// Fused pipeline proto.
8889
// TODO: Consider supporting partially-fused graphs.

sdks/go/pkg/beam/core/runtime/graphx/translate.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
177177
transform := &pipepb.PTransform{
178178
UniqueName: s.Scope.Name,
179179
Subtransforms: subtransforms,
180+
EnvironmentId: m.addDefaultEnv(),
180181
}
181182

182183
m.updateIfCombineComposite(s, transform)
@@ -208,7 +209,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PT
208209
AccumulatorCoderId: acID,
209210
}
210211
transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
211-
transform.EnvironmentId = m.addDefaultEnv()
212212
}
213213

214214
func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
@@ -238,10 +238,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
238238
// allPIds tracks additional PTransformIDs generated for the pipeline
239239
var allPIds []string
240240
var spec *pipepb.FunctionSpec
241-
var transformEnvID = ""
242241
switch edge.Edge.Op {
243242
case graph.Impulse:
244-
// TODO(herohde) 7/18/2018: Encode data?
245243
spec = &pipepb.FunctionSpec{Urn: URNImpulse}
246244

247245
case graph.ParDo:
@@ -315,7 +313,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
315313
if edge.Edge.DoFn.IsSplittable() {
316314
payload.RestrictionCoderId = m.coders.Add(edge.Edge.RestrictionCoder)
317315
}
318-
transformEnvID = m.addDefaultEnv()
319316
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
320317

321318
case graph.Combine:
@@ -325,7 +322,6 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
325322
Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
326323
},
327324
}
328-
transformEnvID = m.addDefaultEnv()
329325
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
330326

331327
case graph.Flatten:
@@ -347,6 +343,11 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
347343
panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
348344
}
349345

346+
var transformEnvID = ""
347+
if !(spec.Urn == URNGBK || spec.Urn == URNImpulse) {
348+
transformEnvID = m.addDefaultEnv()
349+
}
350+
350351
transform := &pipepb.PTransform{
351352
UniqueName: edge.Name,
352353
Spec: spec,
@@ -413,10 +414,11 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
413414

414415
flattenID := fmt.Sprintf("%v_flatten", id)
415416
flatten := &pipepb.PTransform{
416-
UniqueName: flattenID,
417-
Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
418-
Inputs: inputs,
419-
Outputs: map[string]string{"i0": out},
417+
UniqueName: flattenID,
418+
Spec: &pipepb.FunctionSpec{Urn: URNFlatten},
419+
Inputs: inputs,
420+
Outputs: map[string]string{"i0": out},
421+
EnvironmentId: m.addDefaultEnv(),
420422
}
421423
m.transforms[flattenID] = flatten
422424
subtransforms = append(subtransforms, flattenID)
@@ -468,6 +470,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
468470
m.transforms[cogbkID] = &pipepb.PTransform{
469471
UniqueName: edge.Name,
470472
Subtransforms: subtransforms,
473+
EnvironmentId: m.addDefaultEnv(),
471474
}
472475
return cogbkID
473476
}
@@ -632,6 +635,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
632635
Spec: &pipepb.FunctionSpec{
633636
Urn: URNReshuffle,
634637
},
638+
EnvironmentId: m.addDefaultEnv(),
635639
}
636640
return reshuffleID
637641
}

0 commit comments

Comments
 (0)