Skip to content

Commit 7ca3068

Browse files
authored
Revert "[ManagedIO] Fail expansion when encountering extra or unknown configu…" (#34802) (#34805)
This reverts commit 363bcc0.
1 parent f7c109d commit 7ca3068

7 files changed

Lines changed: 24 additions & 207 deletions

File tree

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,11 @@ public void testBuildTransformWithManaged() {
304304
+ "bootstrap_servers: some bootstrap\n"
305305
+ "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'",
306306
"topic: topic_3\n"
307+
+ "bootstrap_servers: some bootstrap\n"
308+
+ "schema_registry_url: some-url\n"
309+
+ "schema_registry_subject: some-subject\n"
310+
+ "format: RAW",
311+
"topic: topic_4\n"
307312
+ "bootstrap_servers: some bootstrap\n"
308313
+ "format: PROTO\n"
309314
+ "schema: '"

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,6 @@ public abstract static class ManagedTransform extends PTransform<PInput, PCollec
176176
@VisibleForTesting
177177
abstract List<String> getSupportedIdentifiers();
178178

179-
abstract @Nullable Boolean getSkipConfigValidation();
180-
181179
abstract Builder toBuilder();
182180

183181
@AutoValue.Builder
@@ -191,8 +189,6 @@ abstract static class Builder {
191189
@VisibleForTesting
192190
abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
193191

194-
abstract Builder setSkipConfigValidation(boolean skip);
195-
196192
abstract ManagedTransform build();
197193
}
198194

@@ -219,14 +215,6 @@ ManagedTransform withSupportedIdentifiers(List<String> supportedIdentifiers) {
219215
return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
220216
}
221217

222-
/**
223-
* Skips configuration validation. If unset, the pipeline will fail at construction time if the
224-
* configuration includes unknown fields or missing required fields.
225-
*/
226-
public ManagedTransform skipConfigValidation() {
227-
return toBuilder().setSkipConfigValidation(true).build();
228-
}
229-
230218
@Override
231219
public PCollectionRowTuple expand(PInput input) {
232220
PCollectionRowTuple inputTuple = resolveInput(input);
@@ -236,7 +224,6 @@ public PCollectionRowTuple expand(PInput input) {
236224
.setTransformIdentifier(getIdentifier())
237225
.setConfig(YamlUtils.yamlStringFromMap(getConfig()))
238226
.setConfigUrl(getConfigUrl())
239-
.setSkipConfigValidation(getSkipConfigValidation())
240227
.build();
241228

242229
SchemaTransform underlyingTransform =

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java

Lines changed: 14 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,15 @@
2626
import java.io.InputStream;
2727
import java.nio.ByteBuffer;
2828
import java.nio.charset.StandardCharsets;
29-
import java.util.ArrayList;
3029
import java.util.Arrays;
3130
import java.util.Collection;
3231
import java.util.HashMap;
33-
import java.util.HashSet;
3432
import java.util.List;
3533
import java.util.Map;
3634
import java.util.ServiceLoader;
37-
import java.util.Set;
3835
import javax.annotation.Nullable;
3936
import org.apache.beam.sdk.io.FileSystems;
4037
import org.apache.beam.sdk.io.fs.MatchResult;
41-
import org.apache.beam.sdk.options.PipelineOptions;
4238
import org.apache.beam.sdk.schemas.AutoValueSchema;
4339
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
4440
import org.apache.beam.sdk.schemas.Schema;
@@ -102,11 +98,6 @@ public static Builder builder() {
10298
@SchemaFieldDescription("YAML string config used to build the underlying SchemaTransform.")
10399
public abstract @Nullable String getConfig();
104100

105-
@SchemaFieldDescription(
106-
"Skips configuration validation. If unset, the pipeline will fail at construction "
107-
+ "time if the configuration includes unknown fields or missing required fields.")
108-
public abstract @Nullable Boolean getSkipConfigValidation();
109-
110101
@AutoValue.Builder
111102
public abstract static class Builder {
112103
public abstract Builder setTransformIdentifier(String identifier);
@@ -115,8 +106,6 @@ public abstract static class Builder {
115106

116107
public abstract Builder setConfig(@Nullable String yamlConfig);
117108

118-
public abstract Builder setSkipConfigValidation(@Nullable Boolean skip);
119-
120109
public abstract ManagedConfig build();
121110
}
122111

@@ -165,21 +154,28 @@ protected SchemaTransform from(ManagedConfig managedConfig) {
165154

166155
static class ManagedSchemaTransform extends SchemaTransform {
167156
private final ManagedConfig managedConfig;
157+
private final Row underlyingRowConfig;
168158
private final SchemaTransformProvider underlyingTransformProvider;
169159

170160
ManagedSchemaTransform(
171161
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
162+
// parse config before expansion to check if it matches underlying transform's config schema
163+
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
164+
Row underlyingRowConfig;
165+
try {
166+
underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
167+
} catch (Exception e) {
168+
throw new IllegalArgumentException(
169+
"Encountered an error when retrieving a configuration", e);
170+
}
171+
172+
this.underlyingRowConfig = underlyingRowConfig;
172173
this.underlyingTransformProvider = underlyingTransformProvider;
173174
this.managedConfig = managedConfig;
174175
}
175176

176177
@Override
177178
public PCollectionRowTuple expand(PCollectionRowTuple input) {
178-
Row underlyingRowConfig =
179-
getRowConfig(
180-
managedConfig,
181-
underlyingTransformProvider.configurationSchema(),
182-
input.getPipeline().getOptions());
183179
LOG.debug(
184180
"Building transform \"{}\" with configuration: {}",
185181
underlyingTransformProvider.identifier(),
@@ -188,7 +184,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
188184
return input.apply(underlyingTransformProvider.from(underlyingRowConfig));
189185
}
190186

191-
@VisibleForTesting
192187
public ManagedConfig getManagedConfig() {
193188
return this.managedConfig;
194189
}
@@ -211,8 +206,7 @@ Row getConfigurationRow() {
211206
// May return an empty row (perhaps the underlying transform doesn't have any required
212207
// parameters)
213208
@VisibleForTesting
214-
static Row getRowConfig(
215-
ManagedConfig config, Schema transformConfigSchema, PipelineOptions options) {
209+
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
216210
Map<String, Object> configMap = config.resolveUnderlyingConfig();
217211
// Build a config Row that will be used to build the underlying SchemaTransform.
218212
// If a mapping for the SchemaTransform exists, we use it to update parameter names to align
@@ -231,42 +225,7 @@ static Row getRowConfig(
231225
configMap = remappedConfig;
232226
}
233227

234-
@Nullable Boolean skipValidation = config.getSkipConfigValidation();
235-
if (skipValidation == null || !skipValidation) {
236-
validateUserConfig(
237-
config.getTransformIdentifier(),
238-
new HashSet<>(configMap.keySet()),
239-
transformConfigSchema);
240-
}
241-
242-
return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
243-
}
244-
245-
static void validateUserConfig(
246-
String transformId, Set<String> userParams, Schema transformConfigSchema) {
247-
List<String> missingRequiredFields = new ArrayList<>();
248-
for (Schema.Field field : transformConfigSchema.getFields()) {
249-
boolean inUserConfig = userParams.remove(field.getName());
250-
if (!field.getType().getNullable() && !inUserConfig) {
251-
missingRequiredFields.add(field.getName());
252-
}
253-
}
254-
255-
if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
256-
String msg = "Invalid config for transform '" + transformId + "':";
257-
if (!missingRequiredFields.isEmpty()) {
258-
msg += " Missing required fields: " + missingRequiredFields + ".";
259-
}
260-
if (!userParams.isEmpty()) {
261-
msg +=
262-
" Contains unknown fields: "
263-
+ userParams
264-
+ ". If you'd still like to pass "
265-
+ "these fields, use '.skipConfigValidation()'";
266-
}
267-
268-
throw new IllegalArgumentException(msg);
269-
}
228+
return YamlUtils.toBeamRow(configMap, transformSchema, false);
270229
}
271230

272231
public static Map<String, Map<String, String>> getAliases() {

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/testing/TestSchemaTransformProvider.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.beam.sdk.managed.testing;
1919

20-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
21-
2220
import com.google.auto.service.AutoService;
2321
import com.google.auto.value.AutoValue;
2422
import org.apache.beam.sdk.schemas.AutoValueSchema;
@@ -33,7 +31,6 @@
3331
import org.apache.beam.sdk.values.PCollectionRowTuple;
3432
import org.apache.beam.sdk.values.Row;
3533
import org.apache.beam.sdk.values.TypeDescriptors;
36-
import org.checkerframework.checker.nullness.qual.Nullable;
3734

3835
@AutoService(SchemaTransformProvider.class)
3936
public class TestSchemaTransformProvider
@@ -55,27 +52,19 @@ public static Builder builder() {
5552
@SchemaFieldDescription("Integer to add to each row element.")
5653
public abstract Integer getExtraInteger();
5754

58-
@SchemaFieldDescription("If true, will upper case the extra string. Default is false.")
59-
public abstract @Nullable Boolean getToggleUppercase();
60-
6155
@AutoValue.Builder
6256
public abstract static class Builder {
6357
public abstract Builder setExtraString(String extraString);
6458

6559
public abstract Builder setExtraInteger(Integer extraInteger);
6660

67-
public abstract Builder setToggleUppercase(Boolean toggleUppercase);
68-
6961
public abstract Config build();
7062
}
7163
}
7264

7365
@Override
7466
public SchemaTransform from(Config config) {
75-
String extraString =
76-
firstNonNull(config.getToggleUppercase(), false)
77-
? config.getExtraString().toUpperCase()
78-
: config.getExtraString();
67+
String extraString = config.getExtraString();
7968
Integer extraInteger = config.getExtraInteger();
8069
return new SchemaTransform() {
8170
@Override

sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java

Lines changed: 2 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,9 @@
2424
import java.net.URISyntaxException;
2525
import java.nio.file.Paths;
2626
import java.util.Arrays;
27-
import java.util.Map;
28-
import org.apache.beam.sdk.Pipeline;
2927
import org.apache.beam.sdk.managed.testing.TestSchemaTransformProvider;
30-
import org.apache.beam.sdk.options.PipelineOptionsFactory;
3128
import org.apache.beam.sdk.schemas.Schema;
32-
import org.apache.beam.sdk.schemas.utils.YamlUtils;
33-
import org.apache.beam.sdk.transforms.Create;
34-
import org.apache.beam.sdk.values.PCollectionRowTuple;
3529
import org.apache.beam.sdk.values.Row;
36-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3730
import org.junit.Rule;
3831
import org.junit.Test;
3932
import org.junit.rules.ExpectedException;
@@ -43,8 +36,6 @@
4336
@RunWith(JUnit4.class)
4437
public class ManagedSchemaTransformProviderTest {
4538
@Rule public transient ExpectedException thrown = ExpectedException.none();
46-
private static final Schema EMPTY_SCHEMA = Schema.builder().build();
47-
private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA);
4839

4940
@Test
5041
public void testFailWhenNoConfigSpecified() {
@@ -58,103 +49,6 @@ public void testFailWhenNoConfigSpecified() {
5849
config.validate();
5950
}
6051

61-
@Test
62-
public void testFailWhenUnknownFieldsSpecified() {
63-
Map<String, Object> config =
64-
ImmutableMap.of(
65-
"extra_string",
66-
"str",
67-
"extra_integer",
68-
123,
69-
"toggle_uppercase",
70-
true,
71-
"unknown_field",
72-
"unknown");
73-
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
74-
ManagedSchemaTransformProvider.ManagedConfig.builder()
75-
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
76-
.setConfig(YamlUtils.yamlStringFromMap(config))
77-
.build();
78-
79-
thrown.expect(IllegalArgumentException.class);
80-
thrown.expectMessage("Invalid config for transform");
81-
thrown.expectMessage(TestSchemaTransformProvider.IDENTIFIER);
82-
thrown.expectMessage("Contains unknown fields");
83-
thrown.expectMessage("unknown_field");
84-
Pipeline p = Pipeline.create();
85-
new ManagedSchemaTransformProvider(null)
86-
.from(managedConfig)
87-
.expand(
88-
PCollectionRowTuple.of(
89-
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
90-
}
91-
92-
@Test
93-
public void testFailWhenMissingRequiredFields() {
94-
Map<String, Object> config = ImmutableMap.of("extra_string", "str", "toggle_uppercase", true);
95-
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
96-
ManagedSchemaTransformProvider.ManagedConfig.builder()
97-
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
98-
.setConfig(YamlUtils.yamlStringFromMap(config))
99-
.build();
100-
101-
thrown.expect(IllegalArgumentException.class);
102-
thrown.expectMessage("Invalid config for transform");
103-
thrown.expectMessage(TestSchemaTransformProvider.IDENTIFIER);
104-
thrown.expectMessage("Missing required fields");
105-
thrown.expectMessage("extra_integer");
106-
Pipeline p = Pipeline.create();
107-
new ManagedSchemaTransformProvider(null)
108-
.from(managedConfig)
109-
.expand(
110-
PCollectionRowTuple.of(
111-
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
112-
}
113-
114-
@Test
115-
public void testPassWhenMissingNullableFields() {
116-
Map<String, Object> config = ImmutableMap.of("extra_string", "str", "extra_integer", 123);
117-
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
118-
ManagedSchemaTransformProvider.ManagedConfig.builder()
119-
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
120-
.setConfig(YamlUtils.yamlStringFromMap(config))
121-
.build();
122-
123-
Pipeline p = Pipeline.create();
124-
new ManagedSchemaTransformProvider(null)
125-
.from(managedConfig)
126-
.expand(
127-
PCollectionRowTuple.of(
128-
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
129-
}
130-
131-
@Test
132-
public void testSkipConfigValidationWithUnknownFields() {
133-
Map<String, Object> config =
134-
ImmutableMap.of(
135-
"extra_string",
136-
"str",
137-
"extra_integer",
138-
123,
139-
"toggle_uppercase",
140-
true,
141-
"unknown_field",
142-
"unknown");
143-
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
144-
ManagedSchemaTransformProvider.ManagedConfig.builder()
145-
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
146-
.setConfig(YamlUtils.yamlStringFromMap(config))
147-
.setSkipConfigValidation(true)
148-
.build();
149-
150-
Pipeline p = Pipeline.create();
151-
new ManagedSchemaTransformProvider(null)
152-
.from(managedConfig)
153-
.expand(
154-
PCollectionRowTuple.of(
155-
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
156-
}
157-
15852
@Test
15953
public void testGetConfigRowFromYamlString() {
16054
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
@@ -171,8 +65,7 @@ public void testGetConfigRowFromYamlString() {
17165
.build();
17266

17367
Row returnedRow =
174-
ManagedSchemaTransformProvider.getRowConfig(
175-
config, TestSchemaTransformProvider.SCHEMA, PipelineOptionsFactory.create());
68+
ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA);
17669

17770
assertEquals(expectedRow, returnedRow);
17871
}
@@ -195,8 +88,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
19588
.withFieldValue("extra_integer", 123)
19689
.build();
19790
Row configRow =
198-
ManagedSchemaTransformProvider.getRowConfig(
199-
config, TestSchemaTransformProvider.SCHEMA, PipelineOptionsFactory.create());
91+
ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA);
20092

20193
assertEquals(expectedRow, configRow);
20294
}

0 commit comments

Comments
 (0)