|
53 | 53 | "nullness" // TODO(https://github.com/apache/beam/issues/20497) |
54 | 54 | }) |
55 | 55 | class WindmillTimerInternals implements TimerInternals { |
| 56 | + private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = |
| 57 | + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); |
| 58 | + |
| 59 | + private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = |
| 60 | + BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); |
56 | 61 |
|
57 | 62 | private static final String TIMER_HOLD_PREFIX = "/h"; |
58 | 63 | // Map from timer id to its TimerData. If it is to be deleted, we still need |
@@ -286,8 +291,14 @@ static Timer.Builder buildWindmillTimerFromTimerData( |
286 | 291 | builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); |
287 | 292 |
|
288 | 293 | // Store the output timestamp in the metadata timestamp. |
289 | | - builder.setMetadataTimestamp( |
290 | | - WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp())); |
| 294 | + Instant outputTimestamp = timerData.getOutputTimestamp(); |
| 295 | + if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { |
| 296 | + // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of |
| 297 | + // the global window |
| 298 | + // here instead. |
| 299 | + outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; |
| 300 | + } |
| 301 | + builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); |
291 | 302 | return builder; |
292 | 303 | } |
293 | 304 |
|
@@ -375,7 +386,13 @@ public static TimerData windmillTimerToTimerData( |
375 | 386 | throw new RuntimeException(e); |
376 | 387 | } |
377 | 388 | } else if (timer.hasMetadataTimestamp()) { |
| 389 | + // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure |
| 390 | + // to change the upper |
| 391 | + // bound. |
378 | 392 | outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp()); |
| 393 | + if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) { |
| 394 | + outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE; |
| 395 | + } |
379 | 396 | } |
380 | 397 |
|
381 | 398 | StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); |
|
0 commit comments