RocketMQ 支持PollMessageSource

pull/2769/head
sorie 3 years ago committed by Steve Rao
parent b8bcca486f
commit a2a006b53b

@ -55,9 +55,12 @@ public class RocketMQPollableConsumeApplication {
}
public interface PolledConsumer {
@Input("pollable-in-0")
PollableMessageSource pollable();
}
@Bean
public ApplicationRunner producer() {
return args -> {
@ -78,15 +81,18 @@ public class RocketMQPollableConsumeApplication {
while (true) {
try {
if (!destIn.poll((m) -> {
SimpleMsg newPayload = (SimpleMsg)m.getPayload();
SimpleMsg newPayload = (SimpleMsg) m.getPayload();
System.out.println(newPayload.getMsg());
}, new ParameterizedTypeReference<SimpleMsg>() {})) {
}, new ParameterizedTypeReference<SimpleMsg>() {
})) {
Thread.sleep(1000);
}
} catch (Exception e) {
}
catch (Exception e) {
// handle failure
}
}
};
}
}

@ -65,9 +65,10 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration {
}
/**
* Register message converter to adapte Spring Cloud Stream.
* Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters .
* @return
* Register message converter to adapte Spring Cloud Stream. Refer to
* https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters
* .
* @return message converter
*/
@Bean
public MessageConverter rocketMQCustomMessageConverter() {

@ -20,7 +20,12 @@ import java.util.ArrayList;
import java.util.List;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.*;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.util.ClassUtils;
/**
@ -95,22 +100,25 @@ public class RocketMQMessageConverter extends AbstractMessageConverter {
}
/**
* Convert the message payload from serialized form to an Object by RocketMQMessageConverter.
* Convert the message payload from serialized form to an Object by
* RocketMQMessageConverter.
* @param message the input message
* @param targetClass the target class for the conversion
* @param conversionHint an extra object passed to the {@link MessageConverter},
* e.g. the associated {@code MethodParameter} (may be {@code null}}
* @param conversionHint an extra object passed to the {@link MessageConverter}, e.g.
* the associated {@code MethodParameter} (may be {@code null}}
* @return the result of the conversion, or {@code null} if the converter cannot
* perform the conversion
* @since 4.2
*/
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
protected Object convertFromInternal(Message<?> message, Class<?> targetClass,
Object conversionHint) {
Object payload = null;
for (MessageConverter converter : getMessageConverter().getConverters()) {
try {
payload = converter.fromMessage(message, targetClass);
} catch (Exception ignore) {
}
catch (Exception ignore) {
}
if (payload != null) {
return payload;
@ -122,5 +130,4 @@ public class RocketMQMessageConverter extends AbstractMessageConverter {
return payload;
}
}

Loading…
Cancel
Save