7575import org .apache .beam .sdk .transforms .windowing .GlobalWindow ;
7676import org .apache .beam .sdk .transforms .windowing .GlobalWindows ;
7777import org .apache .beam .sdk .transforms .windowing .Window ;
78+ import org .apache .beam .sdk .transforms .windowing .WindowFn ;
7879import org .apache .beam .sdk .util .BackOff ;
7980import org .apache .beam .sdk .util .BackOffUtils ;
8081import org .apache .beam .sdk .util .FluentBackoff ;
8586import org .apache .beam .sdk .values .PCollectionTuple ;
8687import org .apache .beam .sdk .values .TupleTag ;
8788import org .apache .beam .sdk .values .TupleTagList ;
88- import org .apache .beam .sdk .values .WindowingStrategy ;
8989import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .annotations .VisibleForTesting ;
9090import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .base .Strings ;
9191import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .collect .Streams ;
@@ -2279,10 +2279,13 @@ public PCollectionTuple expand(PCollection<Document> input) {
22792279 ConnectionConfiguration connectionConfiguration = getConnectionConfiguration ();
22802280 checkState (connectionConfiguration != null , "withConnectionConfiguration() is required" );
22812281
2282- WindowingStrategy <?, ?> originalStrategy = input .getWindowingStrategy ();
2282+ @ SuppressWarnings ("unchecked" )
2283+ WindowFn <Document , ?> originalWindowFn =
2284+ (WindowFn <Document , ?>) input .getWindowingStrategy ().getWindowFn ();
22832285
22842286 PCollection <Document > docResults ;
2285- PCollection <Document > globalDocs = input .apply (Window .into (new GlobalWindows ()));
2287+ PCollection <Document > globalDocs =
2288+ input .apply ("Window inputs globally" , Window .into (new GlobalWindows ()));
22862289
22872290 if (getUseStatefulBatches ()) {
22882291 docResults =
@@ -2294,7 +2297,8 @@ public PCollectionTuple expand(PCollection<Document> input) {
22942297 }
22952298
22962299 return docResults
2297- .setWindowingStrategyInternal (originalStrategy )
2300+ // Restore windowing of input
2301+ .apply ("Restore original windows" , Window .into (originalWindowFn ))
22982302 .apply (
22992303 ParDo .of (new ResultFilteringFn ())
23002304 .withOutputTags (Write .SUCCESSFUL_WRITES , TupleTagList .of (Write .FAILED_WRITES )));
0 commit comments