2121import java .lang .reflect .TypeVariable ;
2222import java .lang .reflect .WildcardType ;
2323import java .util .ArrayList ;
24+ import java .util .Arrays ;
2425import java .util .Collection ;
2526import java .util .Collections ;
2627import java .util .HashMap ;
@@ -184,11 +185,10 @@ else if (registration.getNames().contains(definition) || registration.getTarget(
184185
185186 boolean isFunctionDefinitionEligible (String functionDefinition ) {
186187 if (this .functionProperties != null ) {
187- for (String definition : this .functionProperties .getIneligibleDefinitions ()) {
188- if (functionDefinition .contains (definition )) {
189- return false ;
190- }
191- }
188+ this .functionProperties .getIneligibleDefinitions ().contains (functionDefinition );
189+ boolean matchFoundInBoth = !Collections .disjoint (Arrays .asList (functionDefinition .split ("\\ |" )),
190+ this .functionProperties .getIneligibleDefinitions ());
191+ return !matchFoundInBoth ;
192192 }
193193 return true ;
194194 }
@@ -563,9 +563,6 @@ public Type getItemType(Type type) {
563563 if (FunctionTypeUtils .isPublisher (type ) || FunctionTypeUtils .isMessage (type ) || FunctionTypeUtils .isTypeCollection (type )) {
564564 type = FunctionTypeUtils .getGenericType (type );
565565 }
566- if (FunctionTypeUtils .isMessage (type )) {
567- type = FunctionTypeUtils .getGenericType (type );
568- }
569566 return type ;
570567 }
571568
@@ -808,18 +805,11 @@ private Class<?> getRawClassFor(@Nullable Type type) {
808805 */
809806 private Object enrichInvocationResultIfNecessary (Object input , Object result ) {
810807 if (result != null && !(result instanceof Publisher ) && input instanceof Message ) {
811- if (result instanceof Message ) {
812- if (functionInvocationHelper != null && CloudEventMessageUtils .isCloudEvent (((Message ) input ))) {
813- result = functionInvocationHelper .postProcessResult (result , (Message ) input );
814- }
808+ if (functionInvocationHelper != null && CloudEventMessageUtils .isCloudEvent (((Message ) input ))) {
809+ result = functionInvocationHelper .postProcessResult (result , (Message ) input );
815810 }
816- else {
817- if (functionInvocationHelper != null && CloudEventMessageUtils .isCloudEvent (((Message ) input ))) {
818- result = functionInvocationHelper .postProcessResult (result , (Message ) input );
819- }
820- else if (!FunctionTypeUtils .isCollectionOfMessage (this .outputType )) {
821- result = MessageBuilder .withPayload (result ).copyHeaders (this .sanitizeHeaders (((Message ) input ).getHeaders ())).build ();
822- }
811+ if (!FunctionTypeUtils .isCollectionOfMessage (this .outputType )) {
812+ result = MessageBuilder .withPayload (result ).copyHeaders (this .sanitizeHeaders (((Message ) input ).getHeaders ())).build ();
823813 }
824814 }
825815 return result ;
@@ -920,21 +910,21 @@ private String contentTypeHeaderValue(Message<?> msg) {
920910 @ SuppressWarnings ("unchecked" )
921911 private Object invokeFunction (Object convertedInput ) {
922912 Object result ;
923- if (!this .isTypePublisher (this .inputType ) && convertedInput instanceof Publisher ) {
924- result = convertedInput instanceof Mono
925- ? Mono .from (( Publisher ) convertedInput ).map (value -> this .invokeFunctionAndEnrichResultIfNecessary (value ))
913+ if (!this .isTypePublisher (this .inputType ) && convertedInput instanceof Publisher publisherInput ) {
914+ result = publisherInput instanceof Mono
915+ ? Mono .from (publisherInput ).map (value -> this .invokeFunctionAndEnrichResultIfNecessary (value ))
926916 .doOnError (ex -> logger .error ("Failed to invoke function '" + this .functionDefinition + "'" , (Throwable ) ex ))
927- : Flux .from (( Publisher ) convertedInput ).map (value -> this .invokeFunctionAndEnrichResultIfNecessary (value ))
917+ : Flux .from (publisherInput ).map (value -> this .invokeFunctionAndEnrichResultIfNecessary (value ))
928918 .doOnError (ex -> logger .error ("Failed to invoke function '" + this .functionDefinition + "'" , (Throwable ) ex ));
929919 }
930920 else {
931921 result = this .invokeFunctionAndEnrichResultIfNecessary (convertedInput );
932- if (result instanceof Flux ) {
933- result = (( Flux ) result ) .doOnError (ex -> logger .error ("Failed to invoke function '"
922+ if (result instanceof Flux flux ) {
923+ result = flux .doOnError (ex -> logger .error ("Failed to invoke function '"
934924 + this .functionDefinition + "'" , (Throwable ) ex ));
935925 }
936- else if (result instanceof Mono ) {
937- result = (( Mono ) result ) .doOnError (ex -> logger .error ("Failed to invoke function '"
926+ else if (result instanceof Mono mono ) {
927+ result = mono .doOnError (ex -> logger .error ("Failed to invoke function '"
938928 + this .functionDefinition + "'" , (Throwable ) ex ));
939929 }
940930 }
@@ -989,8 +979,8 @@ else if (value instanceof Mono) {
989979 result = this .postProcessFunction ((Publisher ) result , firstInputMessage );
990980 }
991981
992- return value instanceof OriginalMessageHolder
993- ? this .enrichInvocationResultIfNecessary ((( OriginalMessageHolder ) value ).getOriginalMessage (), result )
982+ return value instanceof OriginalMessageHolder originalMessageHolder
983+ ? this .enrichInvocationResultIfNecessary ((originalMessageHolder ).getOriginalMessage (), result )
994984 : result ;
995985 }
996986
@@ -1035,8 +1025,8 @@ private Publisher postProcessFunction(Publisher result, AtomicReference<Message<
10351025 private Object invokeConsumer (Object convertedInput ) {
10361026 Object result = null ;
10371027 if (this .isTypePublisher (this .inputType )) {
1038- if (convertedInput instanceof Flux ) {
1039- result = (( Flux ) convertedInput )
1028+ if (convertedInput instanceof Flux fluxInput ) {
1029+ result = fluxInput
10401030 .transform (flux -> {
10411031 flux = Flux .from ((Publisher ) flux ).map (v -> this .extractValueFromOriginalValueHolderIfNecessary (v ));
10421032 ((Consumer ) this .target ).accept (flux );
@@ -1052,12 +1042,12 @@ private Object invokeConsumer(Object convertedInput) {
10521042 }).then ();
10531043 }
10541044 }
1055- else if (convertedInput instanceof Publisher ) {
1045+ else if (convertedInput instanceof Publisher publisherInput ) {
10561046 result = convertedInput instanceof Mono
1057- ? Mono .from (( Publisher ) convertedInput )
1047+ ? Mono .from (publisherInput )
10581048 .map (v -> this .extractValueFromOriginalValueHolderIfNecessary (v ))
10591049 .doOnNext ((Consumer ) this .target ).then ()
1060- : Flux .from (( Publisher ) convertedInput )
1050+ : Flux .from (publisherInput )
10611051 .map (v -> this .extractValueFromOriginalValueHolderIfNecessary (v ))
10621052 .doOnNext ((Consumer ) this .target ).then ();
10631053 }
@@ -1185,18 +1175,12 @@ private Message filterOutHeaders(Message message) {
11851175 }
11861176
11871177 private boolean isExtractPayload (Message <?> message , Type type ) {
1188- if (this .propagateInputHeaders ) {
1189- return false ;
1190- }
1191- if (this .isRoutingFunction ()) {
1178+ if (this .propagateInputHeaders || this .isRoutingFunction () || FunctionTypeUtils .isMessage (type )) {
11921179 return false ;
11931180 }
11941181 if (FunctionTypeUtils .isCollectionOfMessage (type )) {
11951182 return true ;
11961183 }
1197- if (FunctionTypeUtils .isMessage (type )) {
1198- return false ;
1199- }
12001184
12011185 Object payload = message .getPayload ();
12021186 if ((payload instanceof byte [])) {
@@ -1291,7 +1275,7 @@ else if (ObjectUtils.isArray(convertedOutput) && !(convertedOutput instanceof by
12911275 * case that requires it since it may contain forwarding url
12921276 */
12931277 private boolean containsRetainMessageSignalInHeaders (Message message ) {
1294- if (functionInvocationHelper != null && functionInvocationHelper .isRetainOuputAsMessage (message )) {
1278+ if (functionInvocationHelper != null && functionInvocationHelper .isRetainOutputAsMessage (message )) {
12951279 return true ;
12961280 }
12971281 else {
@@ -1348,7 +1332,7 @@ private boolean isWrapConvertedInputInMessage(Object convertedInput) {
13481332 *
13491333 */
13501334 private Type extractActualValueTypeIfNecessary (Type type ) {
1351- if (type instanceof ParameterizedType && (FunctionTypeUtils .isPublisher (type ) || FunctionTypeUtils .isMessage (type ))) {
1335+ if (type instanceof ParameterizedType && (FunctionTypeUtils .isPublisher (type ) || FunctionTypeUtils .isMessage (type ))) {
13521336 return FunctionTypeUtils .getGenericType (type );
13531337 }
13541338 return type ;
0 commit comments