Skip to content

Commit 12d0fe4

Browse files
authored
Merge pull request #21936 from [cherry-pick][release-2.40.0][Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
[cherry-pick][release-2.40.0][Fixes #21927] Compress (Un)BoundedSourceAsSdfWrapper element and restriction coders
2 parents f78453f + 1cb7eb6 commit 12d0fe4

2 files changed

Lines changed: 11 additions & 6 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575

7676
* The Go Sdk now requires a minimum version of 1.18 in order to support generics ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)).
7777
* synthetic.SourceConfig field types have changed to int64 from int for better compatibility with Flink's use of Logical types in Schemas (Go) ([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
78+
* Default coder updated to compress sources used with `BoundedSourceAsSDFWrapperFn` and `UnboundedSourceAsSDFWrapper`.
7879

7980
## Deprecations
8081

sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.beam.sdk.coders.InstantCoder;
3535
import org.apache.beam.sdk.coders.NullableCoder;
3636
import org.apache.beam.sdk.coders.SerializableCoder;
37+
import org.apache.beam.sdk.coders.SnappyCoder;
3738
import org.apache.beam.sdk.coders.StructuredCoder;
3839
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
3940
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
@@ -148,7 +149,7 @@ public final PCollection<T> expand(PBegin input) {
148149
.getPipeline()
149150
.apply(Impulse.create())
150151
.apply(ParDo.of(new OutputSingleSource<>(source)))
151-
.setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {}))
152+
.setCoder(SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {})))
152153
.apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
153154
.setCoder(source.getOutputCoder())
154155
.setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
@@ -216,7 +217,9 @@ public final PCollection<T> expand(PBegin input) {
216217
.apply(Impulse.create())
217218
.apply(ParDo.of(new OutputSingleSource<>(source)))
218219
.setCoder(
219-
SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {}))
220+
SnappyCoder.of(
221+
SerializableCoder.of(
222+
new TypeDescriptor<UnboundedSource<T, CheckpointMark>>() {})))
220223
.apply(ParDo.of(createUnboundedSdfWrapper()))
221224
.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
222225

@@ -314,7 +317,7 @@ public void processElement(
314317

315318
@GetRestrictionCoder
316319
public Coder<BoundedSourceT> restrictionCoder() {
317-
return SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {});
320+
return SnappyCoder.of(SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {}));
318321
}
319322

320323
/**
@@ -600,9 +603,10 @@ public WatermarkEstimators.Manual newWatermarkEstimator(
600603

601604
@GetRestrictionCoder
602605
public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
603-
return new UnboundedSourceRestrictionCoder<>(
604-
SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
605-
NullableCoder.of(checkpointCoder));
606+
return SnappyCoder.of(
607+
new UnboundedSourceRestrictionCoder<>(
608+
SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}),
609+
NullableCoder.of(checkpointCoder)));
606610
}
607611

608612
/**

0 commit comments

Comments
 (0)