Skip to content

Commit 4b0636f

Browse files
geezylucasolegz
authored andcommitted
GH-1259 feat: add reactor publisher support
Resolves #1259 Resolves #1264 Signed-off-by: geezylucas <sorec@live.com>
1 parent 4f8d647 commit 4b0636f

3 files changed

Lines changed: 211 additions & 38 deletions

File tree

spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/FunctionInvoker.java

Lines changed: 117 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
package org.springframework.cloud.function.adapter.gcp;
1818

1919
import java.io.BufferedReader;
20+
import java.io.IOException;
2021
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
2123
import java.util.Collection;
24+
import java.util.List;
2225
import java.util.Map.Entry;
2326
import java.util.function.Consumer;
2427
import java.util.function.Function;
@@ -30,16 +33,20 @@
3033
import com.google.cloud.functions.HttpRequest;
3134
import com.google.cloud.functions.HttpResponse;
3235
import com.google.cloud.functions.RawBackgroundFunction;
36+
37+
import reactor.core.publisher.Flux;
38+
3339
import org.apache.commons.logging.Log;
3440
import org.apache.commons.logging.LogFactory;
35-
41+
import org.reactivestreams.Publisher;
3642
import org.springframework.boot.SpringApplication;
3743
import org.springframework.boot.WebApplicationType;
3844
import org.springframework.cloud.function.context.FunctionCatalog;
3945
import org.springframework.cloud.function.context.FunctionalSpringApplication;
4046
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
4147
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
4248
import org.springframework.cloud.function.context.config.RoutingFunction;
49+
import org.springframework.cloud.function.json.JsonMapper;
4350
import org.springframework.cloud.function.utils.FunctionClassUtils;
4451
import org.springframework.context.ConfigurableApplicationContext;
4552
import org.springframework.messaging.Message;
@@ -76,6 +83,8 @@ public class FunctionInvoker implements HttpFunction, RawBackgroundFunction {
7683

7784
private ConfigurableApplicationContext context;
7885

86+
private JsonMapper jsonMapper;
87+
7988
public FunctionInvoker() {
8089
this(FunctionClassUtils.getStartClass());
8190
}
@@ -90,18 +99,19 @@ private void init(Class<?> configurationClass) {
9099
System.setProperty(ContextFunctionCatalogAutoConfiguration.JSON_MAPPER_PROPERTY, "gson");
91100
}
92101
Thread.currentThread() // TODO: remove after upgrading to 1.0.0-alpha-2-rc5
93-
.setContextClassLoader(FunctionInvoker.class.getClassLoader());
102+
.setContextClassLoader(FunctionInvoker.class.getClassLoader());
94103

95104
log.info("Initializing: " + configurationClass);
96105
SpringApplication springApplication = springApplication(configurationClass);
97106
this.context = springApplication.run();
98107
this.catalog = this.context.getBean(FunctionCatalog.class);
108+
this.jsonMapper = this.context.getBean(JsonMapper.class);
99109
initFunctionConsumerOrSupplierFromCatalog();
100110
}
101111

102112
private <I> Function<Message<I>, Message<byte[]>> lookupFunction() {
103113
Function<Message<I>, Message<byte[]>> function = this.catalog.lookup(functionName,
104-
MimeTypeUtils.APPLICATION_JSON.toString());
114+
MimeTypeUtils.APPLICATION_JSON.toString());
105115
Assert.notNull(function, "'function' with name '" + functionName + "' must not be null");
106116
return function;
107117
}
@@ -114,43 +124,22 @@ private <I> Function<Message<I>, Message<byte[]>> lookupFunction() {
114124
public void service(HttpRequest httpRequest, HttpResponse httpResponse) throws Exception {
115125
Function<Message<BufferedReader>, Message<byte[]>> function = lookupFunction();
116126

117-
Message<BufferedReader> message = this.functionWrapped.getInputType() == Void.class || this.functionWrapped.getInputType() == null ? null
118-
: MessageBuilder.withPayload(httpRequest.getReader()).copyHeaders(httpRequest.getHeaders()).build();
127+
Message<BufferedReader> message = this.functionWrapped.getInputType() == Void.class
128+
|| this.functionWrapped.getInputType() == null ? null
129+
: MessageBuilder.withPayload(httpRequest.getReader()).copyHeaders(httpRequest.getHeaders())
130+
.build();
119131

120-
Message<?> result = function.apply(message);
132+
Object resultObject = function.apply(message);
121133

122-
if (result != null) {
123-
MessageHeaders headers = result.getHeaders();
124-
if (result.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE)) {
125-
httpResponse.setContentType(result.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString());
126-
}
127-
else if (result.getHeaders().containsKey("Content-Type")) {
128-
httpResponse.setContentType(result.getHeaders().get("Content-Type").toString());
129-
}
130-
else {
131-
httpRequest.getContentType().ifPresent(contentType -> httpResponse.setContentType(contentType));
132-
}
133-
String content = result.getPayload() instanceof String strPayload ? strPayload : new String((byte[]) result.getPayload(), StandardCharsets.UTF_8);
134-
httpResponse.getWriter().write(content);
135-
for (Entry<String, Object> header : headers.entrySet()) {
136-
Object values = header.getValue();
137-
if (values instanceof Collection<?>) {
138-
String headerValue = ((Collection<?>) values).stream().map(item -> item.toString()).collect(Collectors.joining(","));
139-
httpResponse.appendHeader(header.getKey(), headerValue);
140-
}
141-
else {
142-
httpResponse.appendHeader(header.getKey(), header.getValue().toString());
143-
}
134+
if (resultObject != null) {
135+
Message<?> result = null;
136+
if (resultObject instanceof Publisher<?>) {
137+
result = getResultFromPublisher(resultObject);
138+
} else {
139+
result = (Message<?>) resultObject;
144140
}
145141

146-
if (headers.containsKey(HTTP_STATUS_CODE)) {
147-
if (headers.get(HTTP_STATUS_CODE) instanceof Integer) {
148-
httpResponse.setStatusCode((int) headers.get(HTTP_STATUS_CODE));
149-
}
150-
else {
151-
log.warn("The statusCode should be an Integer value");
152-
}
153-
}
142+
buildHttpResponse(httpRequest, httpResponse, result);
154143
}
155144
}
156145

@@ -162,19 +151,109 @@ else if (result.getHeaders().containsKey("Content-Type")) {
162151
* @param context event context.
163152
* @since 3.0.5
164153
*/
154+
@SuppressWarnings("unchecked")
165155
@Override
166156
public void accept(String json, Context context) {
167157
Function<Message<String>, Message<byte[]>> function = lookupFunction();
168158
Message<String> message = this.functionWrapped.getInputType() == Void.class ? null
169-
: MessageBuilder.withPayload(json).setHeader("gcf_context", context).build();
159+
: MessageBuilder.withPayload(json).setHeader("gcf_context", context).build();
170160

171-
Message<byte[]> result = function.apply(message);
161+
Object resultObject = function.apply(message);
162+
163+
Message<byte[]> result = null;
164+
if (resultObject instanceof Publisher<?>) {
165+
result = getResultFromPublisher(resultObject);
166+
} else {
167+
result = (Message<byte[]>) resultObject;
168+
}
172169

173170
if (result != null) {
174171
log.info("Dropping background function result: " + new String(result.getPayload()));
175172
}
176173
}
177174

175+
/**
176+
* This method build the http response from service
177+
*
178+
* @throws IOException
179+
*/
180+
private void buildHttpResponse(HttpRequest httpRequest, HttpResponse httpResponse, Message<?> result)
181+
throws IOException {
182+
MessageHeaders headers = result.getHeaders();
183+
if (result.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE)) {
184+
httpResponse.setContentType(result.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString());
185+
} else if (result.getHeaders().containsKey("Content-Type")) {
186+
httpResponse.setContentType(result.getHeaders().get("Content-Type").toString());
187+
} else {
188+
httpRequest.getContentType().ifPresent(contentType -> httpResponse.setContentType(contentType));
189+
}
190+
String content = result.getPayload() instanceof String strPayload ? strPayload
191+
: new String((byte[]) result.getPayload(), StandardCharsets.UTF_8);
192+
httpResponse.getWriter().write(content);
193+
for (Entry<String, Object> header : headers.entrySet()) {
194+
Object values = header.getValue();
195+
if (values instanceof Collection<?>) {
196+
String headerValue = ((Collection<?>) values).stream().map(item -> item.toString())
197+
.collect(Collectors.joining(","));
198+
httpResponse.appendHeader(header.getKey(), headerValue);
199+
} else {
200+
httpResponse.appendHeader(header.getKey(), header.getValue().toString());
201+
}
202+
}
203+
204+
if (headers.containsKey(HTTP_STATUS_CODE)) {
205+
if (headers.get(HTTP_STATUS_CODE) instanceof Integer) {
206+
httpResponse.setStatusCode((int) headers.get(HTTP_STATUS_CODE));
207+
} else {
208+
log.warn("The statusCode should be an Integer value");
209+
}
210+
}
211+
}
212+
213+
/**
214+
* This methd get the result from reactor's publisher
215+
*
216+
* For reference: https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java
217+
*/
218+
private Message<byte[]> getResultFromPublisher(Object resultObject) {
219+
List<Object> results = new ArrayList<>();
220+
Message<?> lastMessage = null;
221+
for (Object item : Flux.from((Publisher<?>) resultObject).toIterable()) {
222+
log.info("Response value: " + item);
223+
if (item instanceof Message<?> messageItem) {
224+
results.add(convertFromJsonIfNecessary(messageItem.getPayload()));
225+
lastMessage = messageItem;
226+
} else {
227+
results.add(convertFromJsonIfNecessary(item));
228+
}
229+
}
230+
231+
byte[] resultsPayload;
232+
if (results.size() == 1) {
233+
resultsPayload = jsonMapper.toJson(results.get(0));
234+
} else if (results.size() > 1) {
235+
resultsPayload = jsonMapper.toJson(results);
236+
} else {
237+
resultsPayload = null;
238+
}
239+
240+
Assert.notNull(resultsPayload, "Couldn't resolve payload result");
241+
242+
MessageBuilder<byte[]> messageBuilder = MessageBuilder.withPayload(resultsPayload);
243+
if (lastMessage != null) {
244+
messageBuilder.copyHeaders(lastMessage.getHeaders());
245+
}
246+
return messageBuilder.build();
247+
}
248+
249+
private Object convertFromJsonIfNecessary(Object value) {
250+
if (JsonMapper.isJsonString(value)) {
251+
return jsonMapper.fromJson(value, Object.class);
252+
}
253+
254+
return value;
255+
}
256+
178257
private void initFunctionConsumerOrSupplierFromCatalog() {
179258
String name = resolveName(Function.class);
180259
this.functionWrapped = this.catalog.lookup(Function.class, name);

spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/test/java/org/springframework/cloud/function/adapter/gcp/FunctionInvokerBackgroundTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
import com.github.blindpirate.extensions.CaptureSystemOutput;
2424
import com.google.gson.Gson;
25+
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
28+
2529
import org.hamcrest.Matchers;
2630
import org.junit.jupiter.api.Test;
2731

@@ -58,6 +62,18 @@ public void testJsonInputFunction_Background(CaptureSystemOutput.OutputCapture o
5862
"Thank you for sending the message: hello", null, null);
5963
}
6064

65+
@Test
66+
public void testJsonInputFunction_BackgroundMono(CaptureSystemOutput.OutputCapture outputCapture) {
67+
testBackgroundFunction(outputCapture, JsonInputFunctionMono.class, new IncomingRequest("hello"),
68+
"Thank you for sending the message: hello", null, null);
69+
}
70+
71+
@Test
72+
public void testJsonInputFunction_BackgroundFlux(CaptureSystemOutput.OutputCapture outputCapture) {
73+
testBackgroundFunction(outputCapture, JsonInputFunctionFlux.class, new IncomingRequest("hello"),
74+
"Thank you for sending the message: hello", null, null);
75+
}
76+
6177
@Test
6278
public void testJsonInputOutputFunction_Background(CaptureSystemOutput.OutputCapture outputCapture) {
6379
testBackgroundFunction(outputCapture, JsonInputOutputFunction.class, new IncomingRequest("hello"),
@@ -149,6 +165,28 @@ public Function<IncomingRequest, String> function() {
149165

150166
}
151167

168+
@Configuration
169+
@Import({ ContextFunctionCatalogAutoConfiguration.class })
170+
protected static class JsonInputFunctionMono {
171+
172+
@Bean
173+
public Function<Mono<IncomingRequest>, Mono<String>> function() {
174+
return (in) -> in.map(o -> "Thank you for sending the message: " + o.message);
175+
}
176+
177+
}
178+
179+
@Configuration
180+
@Import({ ContextFunctionCatalogAutoConfiguration.class })
181+
protected static class JsonInputFunctionFlux {
182+
183+
@Bean
184+
public Function<Flux<IncomingRequest>, Flux<String>> function() {
185+
return (in) -> in.map(o -> "Thank you for sending the message: " + o.message);
186+
}
187+
188+
}
189+
152190
@Configuration
153191
@Import({ ContextFunctionCatalogAutoConfiguration.class })
154192
protected static class JsonInputOutputFunction {

spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/test/java/org/springframework/cloud/function/adapter/gcp/FunctionInvokerHttpTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
import com.google.cloud.functions.HttpRequest;
3131
import com.google.cloud.functions.HttpResponse;
3232
import com.google.gson.Gson;
33+
34+
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Mono;
36+
3337
import org.junit.jupiter.api.BeforeEach;
3438
import org.junit.jupiter.api.Test;
3539
import org.junit.jupiter.api.extension.ExtendWith;
@@ -90,6 +94,36 @@ public void testHelloWorldSupplier() throws Exception {
9094

9195
}
9296

97+
@Test
98+
public void testJsonInputFunctionMono() throws Exception {
99+
100+
FunctionInvoker handler = new FunctionInvoker(JsonInputFunctionMono.class);
101+
102+
String expectedOutput = "Thank you for sending the message: hello";
103+
IncomingRequest input = new IncomingRequest("hello");
104+
105+
when(request.getReader()).thenReturn(new BufferedReader(new StringReader(gson.toJson(input))));
106+
handler.service(request, response);
107+
bufferedWriter.close();
108+
109+
110+
assertThat(writer.toString()).isEqualTo(gson.toJson(expectedOutput));
111+
}
112+
113+
@Test
114+
public void testJsonInputFunctionFlux() throws Exception {
115+
116+
FunctionInvoker handler = new FunctionInvoker(JsonInputFunctionFlux.class);
117+
118+
String expectedOutput = "hello!!!";
119+
120+
when(request.getReader()).thenReturn(new BufferedReader(new StringReader("hello")));
121+
handler.service(request, response);
122+
bufferedWriter.close();
123+
124+
125+
assertThat(writer.toString()).isEqualTo(gson.toJson(expectedOutput));
126+
}
93127

94128
@Test
95129
public void testJsonInputFunction() throws Exception {
@@ -238,6 +272,28 @@ public Function<IncomingRequest, String> function() {
238272

239273
}
240274

275+
@Configuration
276+
@Import({ ContextFunctionCatalogAutoConfiguration.class })
277+
protected static class JsonInputFunctionMono {
278+
279+
@Bean
280+
public Function<Mono<IncomingRequest>, Mono<String>> function() {
281+
return (in) -> in.map(o -> "Thank you for sending the message: " + o.message);
282+
}
283+
284+
}
285+
286+
@Configuration
287+
@Import({ ContextFunctionCatalogAutoConfiguration.class })
288+
protected static class JsonInputFunctionFlux {
289+
290+
@Bean
291+
public Function<Flux<String>, Flux<String>> function() {
292+
return (in) -> in.map(word -> word + "!!!");
293+
}
294+
295+
}
296+
241297
@Configuration
242298
@Import({ ContextFunctionCatalogAutoConfiguration.class })
243299
protected static class JsonInputOutputFunction {

0 commit comments

Comments
 (0)