Skip to content

Commit 70fd451

Browse files
committed
GH-884 Add initial support for BiFunction
Resolves #884 polish
1 parent 1381cd4 commit 70fd451

File tree

4 files changed

+64
-9
lines changed

4 files changed

+64
-9
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package org.springframework.cloud.function.context.catalog;
1818

1919
import java.lang.reflect.Method;
20+
import java.lang.reflect.ParameterizedType;
2021
import java.lang.reflect.Type;
2122
import java.util.Arrays;
2223
import java.util.Set;
24+
import java.util.function.BiFunction;
2325
import java.util.function.Consumer;
2426
import java.util.function.Function;
2527
import java.util.function.Supplier;
@@ -34,11 +36,13 @@
3436
import org.springframework.cloud.function.context.FunctionProperties;
3537
import org.springframework.cloud.function.context.FunctionRegistration;
3638
import org.springframework.cloud.function.context.FunctionRegistry;
39+
import org.springframework.cloud.function.context.config.FunctionContextUtils;
3740
import org.springframework.cloud.function.core.FunctionInvocationHelper;
3841
import org.springframework.cloud.function.json.JsonMapper;
3942
import org.springframework.context.ApplicationContext;
4043
import org.springframework.context.ApplicationContextAware;
4144
import org.springframework.context.support.GenericApplicationContext;
45+
import org.springframework.core.ResolvableType;
4246
import org.springframework.core.convert.ConversionService;
4347
import org.springframework.lang.Nullable;
4448
import org.springframework.messaging.Message;
@@ -138,6 +142,9 @@ else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
138142
functionRegistration = this.applicationContext
139143
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
140144
}
145+
else if (functionCandidate instanceof BiFunction) {
146+
functionRegistration = this.registerMessagingBiFunction((BiFunction) functionCandidate, functionName);
147+
}
141148
else {
142149
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
143150
}
@@ -163,6 +170,36 @@ else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
163170
return (T) function;
164171
}
165172

173+
@SuppressWarnings({ "rawtypes", "unchecked" })
174+
private FunctionRegistration registerMessagingBiFunction(BiFunction userFunction, String functionName) {
175+
Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName);
176+
Type inputType1 = Object.class;
177+
Type inputType2 = Object.class;
178+
if (biFunctionType instanceof ParameterizedType) {
179+
inputType1 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[0];
180+
inputType2 = ((ParameterizedType) biFunctionType).getActualTypeArguments()[1];
181+
}
182+
183+
if (!FunctionTypeUtils.isTypeMap(inputType2)) {
184+
throw new UnsupportedOperationException("BiFunction's second argument must be assignable to Map, since BiFunction "
185+
+ "represents parsed Message with first argument being payload and second headers. "
186+
+ "Other signatures are not supported at the moment.");
187+
}
188+
189+
ResolvableType messageType = ResolvableType.forClassWithGenerics(Message.class, ResolvableType.forType(inputType1));
190+
Type biFunctionWrapperType = ResolvableType.forClassWithGenerics(Function.class, messageType, ResolvableType.forType(inputType2)).getType();
191+
192+
Function wrapperFunction = message -> {
193+
Object payload = ((Message) message).getPayload();
194+
if (payload.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
195+
payload = null;
196+
}
197+
return userFunction.apply(payload, ((Message) message).getHeaders());
198+
};
199+
200+
return new FunctionRegistration<>(wrapperFunction, functionName).type(biFunctionWrapperType);
201+
}
202+
166203
private Object discoverFunctionInBeanFactory(String functionName) {
167204
Object functionCandidate = null;
168205
if (this.applicationContext.containsBean(functionName)) {

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ public static boolean isTypeCollection(Type type) {
102102
return Collection.class.isAssignableFrom(rawType) || JsonNode.class.isAssignableFrom(rawType);
103103
}
104104

105+
public static boolean isTypeMap(Type type) {
106+
if (Map.class.isAssignableFrom(getRawType(type))) {
107+
return true;
108+
}
109+
type = getGenericType(type);
110+
Class<?> rawType = type instanceof ParameterizedType ? getRawType(type) : (Class<?>) type;
111+
return Map.class.isAssignableFrom(rawType);
112+
}
113+
105114
public static boolean isTypeArray(Type type) {
106115
return getRawType(type).isArray();
107116
}

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Ob
8989
for (Object item : iterablePayload) {
9090
boolean isConverted = false;
9191
if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) {
92-
resultList.add(item);
92+
resultList.add(null);
9393
isConverted = true;
9494
}
9595
for (Iterator<MessageConverter> iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) {

spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.concurrent.atomic.AtomicReference;
37+
import java.util.function.BiFunction;
3738
import java.util.function.Consumer;
3839
import java.util.function.Function;
3940
import java.util.function.Supplier;
@@ -184,17 +185,18 @@ public void testDefaultLookup() throws Exception {
184185
assertThat(((FunctionInvocationWrapper) function).isComposed()).isTrue();
185186
}
186187

188+
@SuppressWarnings({ "unchecked", "rawtypes" })
187189
@Test
188-
public void testImperativeFunction() {
190+
public void testBiFunction() {
189191
FunctionCatalog catalog = this.configureCatalog();
190192

191-
// Function<String, String> asIs = catalog.lookup("uppercase");
192-
// assertThat(asIs.apply("uppercase")).isEqualTo("UPPERCASE");
193-
//
194-
// Function<Flux<String>, Flux<String>> asFlux = catalog.lookup("uppercase");
195-
// List<String> result = asFlux.apply(Flux.just("uppercaseFlux", "uppercaseFlux2")).collectList().block();
196-
// assertThat(result.get(0)).isEqualTo("UPPERCASEFLUX");
197-
// assertThat(result.get(1)).isEqualTo("UPPERCASEFLUX2");
193+
Function biFunction = catalog.lookup("biFuncUpperCase");
194+
assertThat(biFunction.apply("hello")).isEqualTo("HELLO");
195+
}
196+
197+
@Test
198+
public void testImperativeFunction() {
199+
FunctionCatalog catalog = this.configureCatalog();
198200

199201
Function<Flux<Message<byte[]>>, Flux<Message<byte[]>>> messageFlux = catalog.lookup("uppercase", "application/json");
200202
Message<byte[]> message1 = MessageBuilder.withPayload("\"uppercaseFlux\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
@@ -1056,6 +1058,13 @@ public Supplier<String> numberword() {
10561058
return () -> "one";
10571059
}
10581060

1061+
@Bean
1062+
public BiFunction<String, Map, String> biFuncUpperCase() {
1063+
return (p, h) -> {
1064+
return p.toUpperCase();
1065+
};
1066+
}
1067+
10591068
@Bean
10601069
public Function<Map<String, Object>, Person> maptopojo() {
10611070
return map -> {

0 commit comments

Comments
 (0)