Skip to content

Commit f02824c

Browse files
committed
GH-1302 Ensure reactive/imperative composition is not subscribed twise
Resolves #1302
1 parent 8223cbc commit f02824c

3 files changed

Lines changed: 25 additions & 6 deletions

File tree

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ public String toString() {
741741
* Returns true if this function wrapper represents a composed function.
742742
* @return true if this function wrapper represents a composed function otherwise false
743743
*/
744-
boolean isComposed() {
744+
public boolean isComposed() {
745745
return this.composed;
746746
}
747747

spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public static Publisher<?> processRequest(FunctionWrapper wrapper, Object argume
132132

133133
Object result = function.apply(inputMessage);
134134
if (function.isConsumer()) {
135-
if (result instanceof Publisher) {
135+
if (result instanceof Publisher && !function.isComposed()) {
136136
Mono.from((Publisher) result).subscribe();
137137
}
138138
return "DELETE".equals(wrapper.getMethod()) ?

spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,6 @@ public void headers() throws Exception {
180180
ResponseEntity<String> result = this.rest.exchange(RequestEntity
181181
.post(new URI("/headers")).contentType(MediaType.APPLICATION_JSON)
182182
.body("[\"foo\",\"bar\"]"), String.class);
183-
// assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar");
184-
// assertThat(result.getHeaders()).doesNotContainKey("id");
185183
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
186184
}
187185

@@ -375,7 +373,6 @@ public void multipart() throws Exception {
375373

376374
@Test
377375
@DirtiesContext
378-
@Disabled
379376
public void count() throws Exception {
380377
List<String> list = Arrays.asList("A", "B", "A");
381378
assertThat(this.rest.exchange(
@@ -386,7 +383,6 @@ public void count() throws Exception {
386383

387384
@Test
388385
@DirtiesContext
389-
@Disabled
390386
public void fluxWithList() throws Exception {
391387
List<String> list = Arrays.asList("A", "B", "A");
392388
assertThat(this.rest.exchange(
@@ -395,6 +391,14 @@ public void fluxWithList() throws Exception {
395391
String.class).getBody()).isEqualTo("[\"A\",\"B\",\"A\"]");
396392
}
397393

394+
@Test
395+
@DirtiesContext
396+
public void testReactiveFunctionComposdWithImperativeConsumer() throws Exception {
397+
RequestEntity entity = RequestEntity.post(new URI("/functionReactive,consumerImperative")).build();
398+
this.rest.exchange(entity, String.class);
399+
assertThat(ApplicationConfiguration.functionReactiveInvocations).isEqualTo(1);
400+
}
401+
398402
private String sse(String... values) {
399403
return "[\"" + StringUtils.arrayToDelimitedString(values, "\",\"") + "\"]";
400404
}
@@ -405,11 +409,26 @@ public static class ApplicationConfiguration {
405409

406410
private List<String> list = new ArrayList<>();
407411

412+
private static int functionReactiveInvocations;
413+
408414
public static void main(String[] args) throws Exception {
409415
SpringApplication.run(HttpPostIntegrationTests.ApplicationConfiguration.class,
410416
args);
411417
}
412418

419+
@Bean
420+
public Function<Flux<String>, Flux<String>> functionReactive() {
421+
functionReactiveInvocations = 0;
422+
return flux -> flux.doOnNext(x -> functionReactiveInvocations++);
423+
}
424+
425+
@Bean
426+
public Consumer<String> consumerImperative() {
427+
return value -> {
428+
System.out.println(value);
429+
};
430+
}
431+
413432
@Bean({ "uppercase", "transform", "post/more" })
414433
public Function<Flux<String>, Flux<String>> uppercase() {
415434
return flux -> flux.log()

0 commit comments

Comments
 (0)