add rocketmq binder

pull/112/head
fangjian0423 6 years ago
parent 5c9c29b013
commit d3d9de02a1

@ -81,6 +81,7 @@
<module>spring-cloud-alibaba-sentinel-datasource</module>
<module>spring-cloud-alibaba-nacos-config</module>
<module>spring-cloud-alibaba-nacos-discovery</module>
<module>spring-cloud-stream-binder-rocketmq</module>
<module>spring-cloud-alibaba-examples</module>
<module>spring-cloud-alibaba-test</module>
<module>spring-cloud-alibaba-docs</module>

@ -24,6 +24,8 @@
<aliyun.sdk.version>4.0.1</aliyun.sdk.version>
<alicloud.context.version>1.0.0</alicloud.context.version>
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
<rocketmq.version>4.3.1</rocketmq.version>
<metrics.core>4.0.3</metrics.core>
</properties>
<dependencyManagement>
@ -115,6 +117,11 @@
<artifactId>sentinel-dubbo-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- Aliyun OSS dependencies -->
@ -202,6 +209,14 @@
<artifactId>spring-cloud-starter-alicloud-acm</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Third dependencies -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.core}</version>
</dependency>
<!-- Testing Dependencies -->

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba</artifactId>
<version>0.2.1.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
<name>Spring Cloud Alibaba RocketMQ Binder</name>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator-autoconfigure</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,26 @@
package org.springframework.cloud.stream.binder.rocketmq;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public interface RocketMQBinderConstants {
/**
* Header key
*/
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE";
String ROCKET_FLAG = "ROCKET_FLAG";
String ROCKET_SEND_RESULT = "ROCKET_SEND_RESULT";
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**
* Instrumentation key
*/
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
String ENDPOINT_ID = "rocketmq-binder";
}

@ -0,0 +1,101 @@
package org.springframework.cloud.stream.binder.rocketmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQTopicProvisioner rocketTopicProvisioner;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQTopicProvisioner provisioningProvider,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
super(null, provisioningProvider);
this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties;
this.rocketTopicProvisioner = provisioningProvider;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties>
producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
rocketBinderConfigurationProperties, instrumentationManager);
} else {
throw new RuntimeException(
"Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
}
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
consumerProperties)
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
consumerProperties, destination.getName(), group, instrumentationManager);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
} else {
rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return rocketInboundChannelAdapter;
}
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
}
}

@ -0,0 +1,106 @@
package org.springframework.cloud.stream.binder.rocketmq;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
public RocketMQMessageHeaderAccessor() {
super();
}
public RocketMQMessageHeaderAccessor(Message<?> message) {
super(message);
}
public Acknowledgement getAcknowledgement(Message message) {
return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
}
public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
return this;
}
public String getTags() {
return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
}
public RocketMQMessageHeaderAccessor withTags(String tag) {
setHeader(MessageConst.PROPERTY_TAGS, tag);
return this;
}
public String getKeys() {
return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
}
public RocketMQMessageHeaderAccessor withKeys(String keys) {
setHeader(MessageConst.PROPERTY_KEYS, keys);
return this;
}
public MessageExt getRocketMessage() {
return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
}
public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
setHeader(ORIGINAL_ROCKET_MESSAGE, message);
return this;
}
public Integer getDelayTimeLevel() {
return (Integer)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
}
public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
return this;
}
public Integer getFlag() {
return (Integer)getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
}
public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
setHeader(ROCKET_FLAG, delayTimeLevel);
return this;
}
public SendResult getSendResult() {
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
}
public static void putSendResult(MutableMessage message, SendResult sendResult) {
message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
}
public Map<String, String> getUserProperties() {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, Object> entry : this.toMap().entrySet()) {
if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
result.put(entry.getKey(), (String)entry.getValue());
}
}
return result;
}
}

@ -0,0 +1,39 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.MetricRegistry;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Endpoint(id = ENDPOINT_ID)
public class RocketMQBinderEndpoint {
private MetricRegistry metricRegistry = new MetricRegistry();
private Map<String, Object> runtime = new ConcurrentHashMap<>();
@ReadOperation
public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>();
result.put("metrics", metricRegistry().getMetrics());
result.put("runtime", runtime());
return result;
}
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public Map<String, Object> runtime() {
return runtime;
}
}

@ -0,0 +1,37 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
private final InstrumentationManager instrumentationManager;
public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
if (instrumentationManager.getHealthInstrumentations().stream().
allMatch(Instrumentation::isUp)) {
builder.up();
return;
}
if (instrumentationManager.getHealthInstrumentations().stream().
allMatch(Instrumentation::isOutOfService)) {
builder.outOfService();
return;
}
builder.down();
instrumentationManager.getHealthInstrumentations().stream().
filter(instrumentation -> !instrumentation.isStarted()).
forEach(instrumentation1 -> builder.withException(instrumentation1.getStartException()));
}
}

@ -0,0 +1,54 @@
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.client.log.ClientLogger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
public class RocketMQBinderAutoConfiguration {
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired
public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
}
@Bean
public RocketMQTopicProvisioner provisioningProvider() {
return new RocketMQTopicProvisioner();
}
@Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
InstrumentationManager instrumentationManager,
ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean
public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
}
}

@ -0,0 +1,33 @@
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class)
public class RocketMQBinderEndpointAutoConfiguration {
@Bean
public RocketMQBinderEndpoint rocketBinderEndpoint() {
return new RocketMQBinderEndpoint();
}
@Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(InstrumentationManager instrumentationManager) {
return new RocketMQBinderHealthIndicator(instrumentationManager);
}
@Bean
public InstrumentationManager instrumentationManager(RocketMQBinderEndpoint rocketBinderEndpoint) {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), rocketBinderEndpoint.runtime());
}
}

@ -0,0 +1,65 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Acknowledgement {
/**
* for {@link ConsumeConcurrentlyContext} using
*/
private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
/**
* Message consume retry strategy<br>
* -1,no retry,put into DLQ directly<br>
* 0,broker control retry frequency<br>
* >0,client control retry frequency
*/
private Integer consumeConcurrentlyDelayLevel = 0;
/**
* for {@link ConsumeOrderlyContext} using
*/
private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
return this;
}
public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
return consumeConcurrentlyStatus;
}
public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
return consumeOrderlyStatus;
}
public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) {
this.consumeOrderlyStatus = consumeOrderlyStatus;
return this;
}
public Integer getConsumeConcurrentlyDelayLevel() {
return consumeConcurrentlyDelayLevel;
}
public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
}
public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
return consumeOrderlySuspendCurrentQueueTimeMill;
}
public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
}
}

@ -0,0 +1,76 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public abstract class CloudStreamMessageListener implements MessageListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private ConsumerPropertiesWrapper consumerPropertiesWrapper;
private final InstrumentationManager instrumentationManager;
private final Consumer<Message> sendMsgAction;
CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer<Message> sendMsgAction) {
this.instrumentationManager = instrumentationManager;
this.sendMsgAction = sendMsgAction;
}
public String getTagsString() {
return String.join(" || ", consumerPropertiesWrapper.getTagsSet());
}
public void setConsumerPropertiesWrapper(String group, String topic, String tags) {
this.consumerPropertiesWrapper = new ConsumerPropertiesWrapper(group, topic, tags);
}
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
List<Acknowledgement> acknowledgements = new ArrayList<>();
msgs.forEach(msg -> {
logger.info("consuming msg:\n" + msg);
logger.debug("message body:\n" + new String(msg.getBody()));
if (consumerPropertiesWrapper != null && msg.getTopic().equals(consumerPropertiesWrapper.getTopic())) {
if (StringUtils.isEmpty(consumerPropertiesWrapper.getTags()) || consumerPropertiesWrapper.getTagsSet()
.contains(msg.getTags())) {
try {
Acknowledgement acknowledgement = new Acknowledgement();
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
setHeaders(new RocketMQMessageHeaderAccessor().
withAcknowledgment(acknowledgement).
withTags(msg.getTags()).
withKeys(msg.getKeys()).
withFlag(msg.getFlag()).
withRocketMessage(msg)
).build();
acknowledgements.add(acknowledgement);
sendMsgAction.accept(toChannel);
instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic())
.markConsumed();
} catch (Exception e) {
logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic())
.markConsumedFailure();
throw e;
}
}
}
});
return acknowledgements.get(0);
}
}

@ -0,0 +1,33 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.List;
import java.util.function.Consumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.messaging.Message;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
MessageListenerConcurrently {
public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager,
Consumer<Message> sendMsgAction) {
super(instrumentationManager, sendMsgAction);
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
}
}

@ -0,0 +1,31 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.List;
import java.util.function.Consumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.messaging.Message;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly {
public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager,
Consumer<Message> sendMsgAction) {
super(instrumentationManager, sendMsgAction);
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
}
}

@ -0,0 +1,42 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
class ConsumerPropertiesWrapper {
private final String tags;
private final String group;
private final String topic;
private final Set<String> tagsSet;
ConsumerPropertiesWrapper(String group, String topic, String tags) {
this.tags = tags;
this.group = group;
this.topic = topic;
tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect(
Collectors.toSet());
}
String getTags() {
return tags;
}
String getGroup() {
return group;
}
String getTopic() {
return topic;
}
Set<String> getTagsSet() {
return tagsSet;
}
}

@ -0,0 +1,102 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumersManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap
= new HashMap<>();
private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group);
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
consumerGroups.put(group, consumer);
started.put(group, false);
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
logger.info("Rocket consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumers() throws MQClientException {
for (String group : getConsumerGroups()) {
start(group);
}
}
public synchronized void startConsumer(String group) throws MQClientException {
start(group);
}
public synchronized void stopConsumer(String group) {
stop(group);
}
private void stop(String group) {
if (consumerGroups.get(group) != null) {
consumerGroups.get(group).shutdown();
started.put(group, false);
}
}
private synchronized void start(String group) throws MQClientException {
if (started.get(group)) {
return;
}
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
try {
consumerGroups.get(group).start();
started.put(group, true);
groupInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
groupInstrumentation.markStartFailed(e);
logger.error("Rocket Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() {
return consumerGroups.keySet();
}
}

@ -0,0 +1,106 @@
package org.springframework.cloud.stream.binder.rocketmq.integration;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListener;
import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerConcurrently;
import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerOrderly;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private ConsumerInstrumentation consumerInstrumentation;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination;
private final String group;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
private RetryTemplate retryTemplate;
private RecoveryCallback<? extends Object> recoveryCallback;
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
String destination, String group,
InstrumentationManager instrumentationManager) {
this.consumersManager = consumersManager;
this.consumerProperties = consumerProperties;
this.destination = destination;
this.group = group;
this.instrumentationManager = instrumentationManager;
}
@Override
protected void doStart() {
if (!consumerProperties.getExtension().getEnabled()) {
return;
}
String tags = consumerProperties == null ? null : consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties == null ? false : consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
instrumentationManager, msg -> sendMessage(msg))
: new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg));
listener.setConsumerPropertiesWrapper(group, destination, tags);
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
} else {
consumer.subscribe(destination, listener.getTagsString());
}
consumerInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
consumerInstrumentation.markStartFailed(e);
logger.error("Rocket Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
throw new RuntimeException("Rocket Consumer hasn't been subscribed.", e);
}
consumer.registerMessageListener(listener);
try {
consumersManager.startConsumer(group);
} catch (MQClientException e) {
logger.error("Rocket Consumer startup failed. Caused by " + e.getErrorMessage(), e);
throw new RuntimeException("Rocket Consumer startup failed.", e);
}
}
@Override
protected void doStop() {
consumersManager.stopConsumer(group);
}
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
}

@ -0,0 +1,131 @@
package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.time.Instant;
import java.util.Map;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.MessagingException;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
private final RocketMQProducerProperties producerProperties;
private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
protected volatile boolean running = false;
public RocketMQMessageHandler(String destination, RocketMQProducerProperties producerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
this.destination = destination;
this.producerProperties = producerProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
@Override
public void start() {
producer = new DefaultMQProducer(destination);
producerInstrumentation = instrumentationManager.getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
if (producerProperties.getMaxMessageSize() > 0) {
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
}
try {
producer.start();
producerInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
producerInstrumentation.markStartFailed(e);
logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
}
running = true;
}
@Override
public void stop() {
if (producer != null) {
producer.shutdown();
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws Exception {
try {
Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[])message.getPayload());
} else if (message.getPayload() instanceof String) {
toSend = new Message(destination, ((String)message.getPayload()).getBytes());
} else {
throw new UnsupportedOperationException(
"Payload class isn't supported: " + message.getPayload().getClass());
}
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
headerAccessor.setLeaveMutable(true);
toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
toSend.setTags(headerAccessor.getTags());
toSend.setKeys(headerAccessor.getKeys());
toSend.setFlag(headerAccessor.getFlag());
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties().entrySet()) {
toSend.putUserProperty(entry.getKey(), entry.getValue());
}
SendResult sendRes = producer.send(toSend);
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new MQClientException("message hasn't been sent", null);
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage)message, sendRes);
}
instrumentationManager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
Instant.now().toEpochMilli());
producerInstrumentation.markSent();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
UnsupportedOperationException e) {
producerInstrumentation.markSentFailure();
logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
}
}
}

@ -0,0 +1,34 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;
import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumerGroupInstrumentation extends Instrumentation {
private MetricRegistry metricRegistry;
private AtomicBoolean delayedStart = new AtomicBoolean(false);
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
super(name);
this.metricRegistry = metricRegistry;
}
public void markDelayedStart() {
delayedStart.set(true);
}
@Override
public boolean isUp() {
return started.get() || delayedStart.get();
}
@Override
public boolean isOutOfService() {
return !started.get() && startException == null && !delayedStart.get();
}
}

@ -0,0 +1,37 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumerInstrumentation extends Instrumentation {
private final Counter totalConsumed;
private final Counter totalConsumedFailures;
private final Meter consumedPerSecond;
private final Meter consumedFailuresPerSecond;
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
}
public void markConsumed() {
totalConsumed.inc();
consumedPerSecond.mark();
}
public void markConsumedFailure() {
totalConsumedFailures.inc();
consumedFailuresPerSecond.mark();
}
}

@ -0,0 +1,50 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Instrumentation {
private final String name;
protected final AtomicBoolean started = new AtomicBoolean(false);
protected Exception startException = null;
Instrumentation(String name) {
this.name = name;
}
public boolean isDown() {
return startException != null;
}
public boolean isUp() {
return started.get();
}
public boolean isOutOfService() {
return !started.get() && startException == null;
}
public void markStartedSuccessfully() {
started.set(true);
}
public void markStartFailed(Exception e) {
started.set(false);
startException = e;
}
public String getName() {
return name;
}
public boolean isStarted() {
return started.get();
}
public Exception getStartException() {
return startException;
}
}

@ -0,0 +1,57 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class InstrumentationManager {
private final MetricRegistry metricRegistry;
private final Map<String, Object> runtime;
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public InstrumentationManager(MetricRegistry metricRegistry, Map<String, Object> runtime) {
this.metricRegistry = metricRegistry;
this.runtime = runtime;
}
public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = "scs-rocketmq.producer." + destination;
producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key));
return producerInstrumentations.get(key);
}
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
String key = "scs-rocketmq.consumer." + destination;
consumeInstrumentations.putIfAbsent(key, new ConsumerInstrumentation(metricRegistry, key));
return consumeInstrumentations.get(key);
}
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
String key = "scs-rocketmq.consumerGroup." + group;
consumerGroupsInstrumentations.putIfAbsent(key, new ConsumerGroupInstrumentation(metricRegistry, key));
return consumerGroupsInstrumentations.get(key);
}
public Set<Instrumentation> getHealthInstrumentations() {
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
}
public void addHealthInstrumentation(Instrumentation instrumentation) {
healthInstrumentations.put(instrumentation.getName(), instrumentation);
}
public Map<String, Object> getRuntime() {
return runtime;
}
}

@ -0,0 +1,37 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ProducerInstrumentation extends Instrumentation {
private final Counter totalSent;
private final Counter totalSentFailures;
private final Meter sentPerSecond;
private final Meter sentFailuresPerSecond;
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
}
public void markSent() {
totalSent.inc();
sentPerSecond.mark();
}
public void markSentFailure() {
totalSentFailures.inc();
sentFailuresPerSecond.mark();
}
}

@ -0,0 +1,32 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties {
private String namesrvAddr;
private String logLevel = "ERROR";
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getLogLevel() {
return logLevel;
}
public void setLogLevel(String logLevel) {
this.logLevel = logLevel;
}
}

@ -0,0 +1,28 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBindingProperties {
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
public RocketMQConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(RocketMQConsumerProperties consumer) {
this.consumer = consumer;
}
public RocketMQProducerProperties getProducer() {
return producer;
}
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
}

@ -0,0 +1,66 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQConsumerProperties {
/**
* using '||' to split tag
* {@link MQPushConsumer#subscribe(String, String)}
*/
private String tags;
/**
* {@link MQPushConsumer#subscribe(String, MessageSelector)}
* {@link MessageSelector#bySql(String)}
*/
private String sql;
/**
* if orderly is true, using {@link MessageListenerOrderly}
* else if orderly if false, using {@link MessageListenerConcurrently}
*/
private Boolean orderly = false;
private Boolean enabled = true;
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Boolean getOrderly() {
return orderly;
}
public void setOrderly(Boolean orderly) {
this.orderly = orderly;
}
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
}

@ -0,0 +1,64 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties implements
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
public Map<String, RocketMQBindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
this.bindings = bindings;
}
@Override
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getConsumer() != null) {
return bindings.get(channelName).getConsumer();
} else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
this.bindings.get(channelName).setConsumer(properties);
return properties;
}
} else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setConsumer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
@Override
public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getProducer() != null) {
return bindings.get(channelName).getProducer();
} else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
this.bindings.get(channelName).setProducer(properties);
return properties;
}
} else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setProducer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
}

@ -0,0 +1,35 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQProducerProperties {
private Boolean enabled = true;
/**
* Maximum allowed message size in bytes
* {@link DefaultMQProducer#maxMessageSize}
*/
private Integer maxMessageSize = 0;
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public Integer getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(Integer maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
}

@ -0,0 +1,89 @@
package org.springframework.cloud.stream.binder.rocketmq.provisioning;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQTopicProvisioner
implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties>
properties)
throws ProvisioningException {
checkTopic(name);
return new RocketProducerDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
properties)
throws ProvisioningException {
checkTopic(name);
return new RocketConsumerDestination(name);
}
private void checkTopic(String topic) {
try {
Validators.checkTopic(topic);
} catch (MQClientException e) {
logger.error("topic check error: " + topic, e);
throw new AssertionError(e); // Can't happen
}
}
private static final class RocketProducerDestination implements ProducerDestination {
private final String producerDestinationName;
RocketProducerDestination(String destinationName) {
this.producerDestinationName = destinationName;
}
@Override
public String getName() {
return producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return producerDestinationName;
}
}
private static final class RocketConsumerDestination implements ConsumerDestination {
private final String consumerDestinationName;
RocketConsumerDestination(String consumerDestinationName) {
this.consumerDestinationName = consumerDestinationName;
}
@Override
public String getName() {
return this.consumerDestinationName;
}
}
}

@ -0,0 +1 @@
rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration
Loading…
Cancel
Save