From 597ff3145b42d97d4540ef794072b35de17d97da Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Thu, 8 Nov 2018 14:37:13 +0800 Subject: [PATCH] add rocketmq binder --- pom.xml | 1 + spring-cloud-alibaba-dependencies/pom.xml | 15 ++ spring-cloud-stream-binder-rocketmq/pom.xml | 77 ++++++++++ .../rocketmq/RocketMQBinderConstants.java | 26 ++++ .../RocketMQMessageChannelBinder.java | 101 ++++++++++++++ .../RocketMQMessageHeaderAccessor.java | 106 ++++++++++++++ .../actuator/RocketMQBinderEndpoint.java | 39 ++++++ .../RocketMQBinderHealthIndicator.java | 37 +++++ .../RocketMQBinderAutoConfiguration.java | 54 ++++++++ ...cketMQBinderEndpointAutoConfiguration.java | 33 +++++ .../rocketmq/consuming/Acknowledgement.java | 65 +++++++++ .../consuming/CloudStreamMessageListener.java | 76 ++++++++++ ...loudStreamMessageListenerConcurrently.java | 33 +++++ .../CloudStreamMessageListenerOrderly.java | 31 +++++ .../consuming/ConsumerPropertiesWrapper.java | 42 ++++++ .../rocketmq/consuming/ConsumersManager.java | 102 ++++++++++++++ .../RocketMQInboundChannelAdapter.java | 106 ++++++++++++++ .../integration/RocketMQMessageHandler.java | 131 ++++++++++++++++++ .../metrics/ConsumerGroupInstrumentation.java | 34 +++++ .../metrics/ConsumerInstrumentation.java | 37 +++++ .../rocketmq/metrics/Instrumentation.java | 50 +++++++ .../metrics/InstrumentationManager.java | 57 ++++++++ .../metrics/ProducerInstrumentation.java | 37 +++++ ...RocketMQBinderConfigurationProperties.java | 32 +++++ .../properties/RocketMQBindingProperties.java | 28 ++++ .../RocketMQConsumerProperties.java | 66 +++++++++ .../RocketMQExtendedBindingProperties.java | 64 +++++++++ .../RocketMQProducerProperties.java | 35 +++++ .../RocketMQTopicProvisioner.java | 89 ++++++++++++ .../main/resources/META-INF/spring.binders | 1 + .../main/resources/META-INF/spring.factories | 2 + 31 files changed, 1607 insertions(+) create mode 100644 spring-cloud-stream-binder-rocketmq/pom.xml create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories diff --git a/pom.xml b/pom.xml index dbeaab9d7..0262e51ff 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ spring-cloud-alibaba-sentinel-datasource spring-cloud-alibaba-nacos-config spring-cloud-alibaba-nacos-discovery + spring-cloud-stream-binder-rocketmq spring-cloud-alibaba-examples spring-cloud-alibaba-test spring-cloud-alibaba-docs diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 33b5ad6af..0bd867eb2 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -24,6 +24,8 @@ 4.0.1 1.0.0 2.16.0 + 4.3.1 + 4.0.3 @@ -115,6 +117,11 @@ sentinel-dubbo-api ${project.version} + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + @@ -202,6 +209,14 @@ spring-cloud-starter-alicloud-acm ${project.version} + + + + io.dropwizard.metrics + metrics-core + ${metrics.core} + + diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml new file mode 100644 index 000000000..9ff00a155 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -0,0 +1,77 @@ + + + + + org.springframework.cloud + spring-cloud-alibaba + 0.2.1.BUILD-SNAPSHOT + + 4.0.0 + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + Spring Cloud Alibaba RocketMQ Binder + + + + + org.springframework.cloud + spring-cloud-stream + + + + io.dropwizard.metrics + metrics-core + + + + org.apache.rocketmq + rocketmq-client + + + + org.springframework.boot + spring-boot-configuration-processor + provided + true + + + + org.springframework.boot + spring-boot + provided + true + + + + org.springframework.boot + spring-boot-autoconfigure + provided + true + + + + org.springframework.boot + spring-boot-actuator + provided + true + + + + org.springframework.boot + spring-boot-actuator-autoconfigure + provided + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java new file mode 100644 index 000000000..f03fc48eb --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -0,0 +1,26 @@ +package org.springframework.cloud.stream.binder.rocketmq; + +/** + * @author Jim + */ +public interface RocketMQBinderConstants { + + /** + * Header key + */ + String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE"; + + String ROCKET_FLAG = "ROCKET_FLAG"; + + String ROCKET_SEND_RESULT = "ROCKET_SEND_RESULT"; + + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; + + /** + * Instrumentation key + */ + String LASTSEND_TIMESTAMP = "lastSend.timestamp"; + + String ENDPOINT_ID = "rocketmq-binder"; + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java new file mode 100644 index 000000000..da5cf1c75 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -0,0 +1,101 @@ +package org.springframework.cloud.stream.binder.rocketmq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.core.MessageProducer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQMessageChannelBinder extends + AbstractMessageChannelBinder, + ExtendedProducerProperties, RocketMQTopicProvisioner> + implements ExtendedPropertiesBinder { + + private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class); + + private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQTopicProvisioner rocketTopicProvisioner; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final InstrumentationManager instrumentationManager; + private final ConsumersManager consumersManager; + + public RocketMQMessageChannelBinder(ConsumersManager consumersManager, + RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQTopicProvisioner provisioningProvider, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + InstrumentationManager instrumentationManager) { + super(null, provisioningProvider); + this.consumersManager = consumersManager; + this.extendedBindingProperties = extendedBindingProperties; + this.rocketTopicProvisioner = provisioningProvider; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.instrumentationManager = instrumentationManager; + } + + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties + producerProperties, + MessageChannel errorChannel) throws Exception { + if (producerProperties.getExtension().getEnabled()) { + return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(), + rocketBinderConfigurationProperties, instrumentationManager); + } else { + throw new RuntimeException( + "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered"); + } + } + + @Override + protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, + ExtendedConsumerProperties + consumerProperties) + throws Exception { + if (group == null || "".equals(group)) { + throw new RuntimeException("'group' must be configured for channel + " + destination.getName()); + } + + RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager, + consumerProperties, destination.getName(), group, instrumentationManager); + + ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, + consumerProperties); + if (consumerProperties.getMaxAttempts() > 1) { + rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties)); + rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); + } else { + rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel()); + } + + return rocketInboundChannelAdapter; + } + + @Override + public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { + return extendedBindingProperties.getExtendedConsumerProperties(channelName); + } + + @Override + public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { + return extendedBindingProperties.getExtendedProducerProperties(channelName); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java new file mode 100644 index 000000000..480ec1941 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java @@ -0,0 +1,106 @@ +package org.springframework.cloud.stream.binder.rocketmq; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; +import org.springframework.integration.support.MutableMessage; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageHeaderAccessor; + +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { + + public RocketMQMessageHeaderAccessor() { + super(); + } + + public RocketMQMessageHeaderAccessor(Message message) { + super(message); + } + + public Acknowledgement getAcknowledgement(Message message) { + return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class); + } + + public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) { + setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment); + return this; + } + + public String getTags() { + return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, ""); + } + + public RocketMQMessageHeaderAccessor withTags(String tag) { + setHeader(MessageConst.PROPERTY_TAGS, tag); + return this; + } + + public String getKeys() { + return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, ""); + } + + public RocketMQMessageHeaderAccessor withKeys(String keys) { + setHeader(MessageConst.PROPERTY_KEYS, keys); + return this; + } + + public MessageExt getRocketMessage() { + return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class); + } + + public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) { + setHeader(ORIGINAL_ROCKET_MESSAGE, message); + return this; + } + + public Integer getDelayTimeLevel() { + return (Integer)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0); + } + + public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) { + setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); + return this; + } + + public Integer getFlag() { + return (Integer)getMessageHeaders().getOrDefault(ROCKET_FLAG, 0); + } + + public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) { + setHeader(ROCKET_FLAG, delayTimeLevel); + return this; + } + + public SendResult getSendResult() { + return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); + } + + public static void putSendResult(MutableMessage message, SendResult sendResult) { + message.getHeaders().put(ROCKET_SEND_RESULT, sendResult); + } + + public Map getUserProperties() { + Map result = new HashMap<>(); + for (Map.Entry entry : this.toMap().entrySet()) { + if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry + .getKey().equals(MessageHeaders.CONTENT_TYPE)) { + result.put(entry.getKey(), (String)entry.getValue()); + } + } + return result; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java new file mode 100644 index 000000000..43303a0f2 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java @@ -0,0 +1,39 @@ +package org.springframework.cloud.stream.binder.rocketmq.actuator; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.codahale.metrics.MetricRegistry; +import org.springframework.boot.actuate.endpoint.annotation.Endpoint; +import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; + +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID; + +/** + * @author Timur Valiev + * @author Jim + */ +@Endpoint(id = ENDPOINT_ID) +public class RocketMQBinderEndpoint { + + private MetricRegistry metricRegistry = new MetricRegistry(); + private Map runtime = new ConcurrentHashMap<>(); + + @ReadOperation + public Map invoke() { + Map result = new HashMap<>(); + result.put("metrics", metricRegistry().getMetrics()); + result.put("runtime", runtime()); + return result; + } + + public MetricRegistry metricRegistry() { + return metricRegistry; + } + + public Map runtime() { + return runtime; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java new file mode 100644 index 000000000..7a67902d6 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -0,0 +1,37 @@ +package org.springframework.cloud.stream.binder.rocketmq.actuator; + +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { + + private final InstrumentationManager instrumentationManager; + + public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) { + this.instrumentationManager = instrumentationManager; + } + + @Override + protected void doHealthCheck(Health.Builder builder) throws Exception { + if (instrumentationManager.getHealthInstrumentations().stream(). + allMatch(Instrumentation::isUp)) { + builder.up(); + return; + } + if (instrumentationManager.getHealthInstrumentations().stream(). + allMatch(Instrumentation::isOutOfService)) { + builder.outOfService(); + return; + } + builder.down(); + instrumentationManager.getHealthInstrumentations().stream(). + filter(instrumentation -> !instrumentation.isStarted()). + forEach(instrumentation1 -> builder.withException(instrumentation1.getStartException())); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java new file mode 100644 index 000000000..e1b289661 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -0,0 +1,54 @@ +package org.springframework.cloud.stream.binder.rocketmq.config; + +import org.apache.rocketmq.client.log.ClientLogger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; +import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Timur Valiev + * @author Jim + */ +@Configuration +@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class}) +public class RocketMQBinderAutoConfiguration { + + private final RocketMQExtendedBindingProperties extendedBindingProperties; + + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + @Autowired + public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.extendedBindingProperties = extendedBindingProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel()); + } + + @Bean + public RocketMQTopicProvisioner provisioningProvider() { + return new RocketMQTopicProvisioner(); + } + + @Bean + public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, + InstrumentationManager instrumentationManager, + ConsumersManager consumersManager) { + RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties, + provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager); + return binder; + } + + @Bean + public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) { + return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java new file mode 100644 index 000000000..0df4f51c5 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java @@ -0,0 +1,33 @@ +package org.springframework.cloud.stream.binder.rocketmq.config; + +import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; +import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Jim + */ +@Configuration +@AutoConfigureAfter(EndpointAutoConfiguration.class) +public class RocketMQBinderEndpointAutoConfiguration { + + @Bean + public RocketMQBinderEndpoint rocketBinderEndpoint() { + return new RocketMQBinderEndpoint(); + } + + @Bean + public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(InstrumentationManager instrumentationManager) { + return new RocketMQBinderHealthIndicator(instrumentationManager); + } + + @Bean + public InstrumentationManager instrumentationManager(RocketMQBinderEndpoint rocketBinderEndpoint) { + return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), rocketBinderEndpoint.runtime()); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java new file mode 100644 index 000000000..f2c16ed1b --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java @@ -0,0 +1,65 @@ +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; + +/** + * @author Timur Valiev + * @author Jim + */ +public class Acknowledgement { + + /** + * for {@link ConsumeConcurrentlyContext} using + */ + private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + /** + * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency + */ + private Integer consumeConcurrentlyDelayLevel = 0; + + /** + * for {@link ConsumeOrderlyContext} using + */ + private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS; + private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L; + + public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) { + this.consumeConcurrentlyStatus = consumeConcurrentlyStatus; + return this; + } + + public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() { + return consumeConcurrentlyStatus; + } + + public ConsumeOrderlyStatus getConsumeOrderlyStatus() { + return consumeOrderlyStatus; + } + + public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) { + this.consumeOrderlyStatus = consumeOrderlyStatus; + return this; + } + + public Integer getConsumeConcurrentlyDelayLevel() { + return consumeConcurrentlyDelayLevel; + } + + public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) { + this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel; + } + + public Long getConsumeOrderlySuspendCurrentQueueTimeMill() { + return consumeOrderlySuspendCurrentQueueTimeMill; + } + + public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) { + this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java new file mode 100644 index 000000000..9acd6076f --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java @@ -0,0 +1,76 @@ +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.StringUtils; + +/** + * @author Timur Valiev + * @author Jim + */ +public abstract class CloudStreamMessageListener implements MessageListener { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private ConsumerPropertiesWrapper consumerPropertiesWrapper; + + private final InstrumentationManager instrumentationManager; + + private final Consumer sendMsgAction; + + CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer sendMsgAction) { + this.instrumentationManager = instrumentationManager; + this.sendMsgAction = sendMsgAction; + } + + public String getTagsString() { + return String.join(" || ", consumerPropertiesWrapper.getTagsSet()); + } + + public void setConsumerPropertiesWrapper(String group, String topic, String tags) { + this.consumerPropertiesWrapper = new ConsumerPropertiesWrapper(group, topic, tags); + } + + Acknowledgement consumeMessage(final List msgs) { + List acknowledgements = new ArrayList<>(); + msgs.forEach(msg -> { + logger.info("consuming msg:\n" + msg); + logger.debug("message body:\n" + new String(msg.getBody())); + if (consumerPropertiesWrapper != null && msg.getTopic().equals(consumerPropertiesWrapper.getTopic())) { + if (StringUtils.isEmpty(consumerPropertiesWrapper.getTags()) || consumerPropertiesWrapper.getTagsSet() + .contains(msg.getTags())) { + try { + Acknowledgement acknowledgement = new Acknowledgement(); + Message toChannel = MessageBuilder.withPayload(msg.getBody()). + setHeaders(new RocketMQMessageHeaderAccessor(). + withAcknowledgment(acknowledgement). + withTags(msg.getTags()). + withKeys(msg.getKeys()). + withFlag(msg.getFlag()). + withRocketMessage(msg) + ).build(); + acknowledgements.add(acknowledgement); + sendMsgAction.accept(toChannel); + instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic()) + .markConsumed(); + } catch (Exception e) { + logger.error("Rocket Message hasn't been processed successfully. Caused by ", e); + instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic()) + .markConsumedFailure(); + throw e; + } + } + } + }); + return acknowledgements.get(0); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java new file mode 100644 index 000000000..8c4df7d88 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java @@ -0,0 +1,33 @@ +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.List; +import java.util.function.Consumer; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.messaging.Message; + +/** + * @author Timur Valiev + * @author Jim + */ +public class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements + MessageListenerConcurrently { + + public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager, + Consumer sendMsgAction) { + super(instrumentationManager, sendMsgAction); + } + + @Override + public ConsumeConcurrentlyStatus consumeMessage(final List msgs, + ConsumeConcurrentlyContext context) { + Acknowledgement acknowledgement = consumeMessage(msgs); + context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel()); + return acknowledgement.getConsumeConcurrentlyStatus(); + } + +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java new file mode 100644 index 000000000..c63fb9bd9 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java @@ -0,0 +1,31 @@ +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.List; +import java.util.function.Consumer; + +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.messaging.Message; + +/** + * @author Timur Valiev + * @author Jim + */ +public class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly { + + public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager, + Consumer sendMsgAction) { + super(instrumentationManager, sendMsgAction); + } + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + Acknowledgement acknowledgement = consumeMessage(msgs); + context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); + return acknowledgement.getConsumeOrderlyStatus(); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java new file mode 100644 index 000000000..0e3b9111c --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java @@ -0,0 +1,42 @@ +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * @author Timur Valiev + * @author Jim + */ +class ConsumerPropertiesWrapper { + private final String tags; + private final String group; + private final String topic; + private final Set tagsSet; + + ConsumerPropertiesWrapper(String group, String topic, String tags) { + this.tags = tags; + this.group = group; + this.topic = topic; + tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect( + Collectors.toSet()); + } + + String getTags() { + return tags; + } + + String getGroup() { + return group; + } + + String getTopic() { + return topic; + } + + Set getTagsSet() { + return tagsSet; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java new file mode 100644 index 000000000..3b6c26476 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java @@ -0,0 +1,102 @@ +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; + +/** + * @author Timur Valiev + * @author Jim + */ +public class ConsumersManager { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final Map consumerGroups = new HashMap<>(); + private final Map started = new HashMap<>(); + private final Map, ExtendedConsumerProperties> propertiesMap + = new HashMap<>(); + private final InstrumentationManager instrumentationManager; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + public ConsumersManager(InstrumentationManager instrumentationManager, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.instrumentationManager = instrumentationManager; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + } + + public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic, + ExtendedConsumerProperties consumerProperties) { + propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties); + ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(instrumentation); + + if (consumerGroups.containsKey(group)) { + return consumerGroups.get(group); + } + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); + consumerGroups.put(group, consumer); + started.put(group, false); + consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); + consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); + logger.info("Rocket consuming for SCS group {} created", group); + return consumer; + } + + public synchronized void startConsumers() throws MQClientException { + for (String group : getConsumerGroups()) { + start(group); + } + } + + public synchronized void startConsumer(String group) throws MQClientException { + start(group); + } + + public synchronized void stopConsumer(String group) { + stop(group); + } + + private void stop(String group) { + if (consumerGroups.get(group) != null) { + consumerGroups.get(group).shutdown(); + started.put(group, false); + } + } + + private synchronized void start(String group) throws MQClientException { + if (started.get(group)) { + return; + } + ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation( + group); + instrumentationManager.addHealthInstrumentation(groupInstrumentation); + try { + consumerGroups.get(group).start(); + started.put(group, true); + groupInstrumentation.markStartedSuccessfully(); + } catch (MQClientException e) { + groupInstrumentation.markStartFailed(e); + logger.error("Rocket Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); + throw e; + } + } + + public synchronized Set getConsumerGroups() { + return consumerGroups.keySet(); + } +} + diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java new file mode 100644 index 000000000..c5494a4be --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -0,0 +1,106 @@ +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.exception.MQClientException; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListener; +import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerConcurrently; +import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerOrderly; +import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQInboundChannelAdapter extends MessageProducerSupport { + + private ConsumerInstrumentation consumerInstrumentation; + + private final ExtendedConsumerProperties consumerProperties; + + private final String destination; + + private final String group; + + private final InstrumentationManager instrumentationManager; + + private final ConsumersManager consumersManager; + + private RetryTemplate retryTemplate; + + private RecoveryCallback recoveryCallback; + + public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, + ExtendedConsumerProperties consumerProperties, + String destination, String group, + InstrumentationManager instrumentationManager) { + this.consumersManager = consumersManager; + this.consumerProperties = consumerProperties; + this.destination = destination; + this.group = group; + this.instrumentationManager = instrumentationManager; + } + + @Override + protected void doStart() { + if (!consumerProperties.getExtension().getEnabled()) { + return; + } + + String tags = consumerProperties == null ? null : consumerProperties.getExtension().getTags(); + Boolean isOrderly = consumerProperties == null ? false : consumerProperties.getExtension().getOrderly(); + + DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties); + + final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly( + instrumentationManager, msg -> sendMessage(msg)) + : new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg)); + + listener.setConsumerPropertiesWrapper(group, destination, tags); + + consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + + try { + if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { + consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql())); + } else { + consumer.subscribe(destination, listener.getTagsString()); + } + consumerInstrumentation.markStartedSuccessfully(); + } catch (MQClientException e) { + consumerInstrumentation.markStartFailed(e); + logger.error("Rocket Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e); + throw new RuntimeException("Rocket Consumer hasn't been subscribed.", e); + } + + consumer.registerMessageListener(listener); + + try { + consumersManager.startConsumer(group); + } catch (MQClientException e) { + logger.error("Rocket Consumer startup failed. Caused by " + e.getErrorMessage(), e); + throw new RuntimeException("Rocket Consumer startup failed.", e); + } + } + + @Override + protected void doStop() { + consumersManager.stopConsumer(group); + } + + public void setRetryTemplate(RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; + } + + public void setRecoveryCallback(RecoveryCallback recoveryCallback) { + this.recoveryCallback = recoveryCallback; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java new file mode 100644 index 000000000..01bc72458 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -0,0 +1,131 @@ +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import java.time.Instant; +import java.util.Map; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import org.springframework.context.Lifecycle; +import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.support.MutableMessage; +import org.springframework.messaging.MessagingException; + +/** + * @author Jim + */ +public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { + + private DefaultMQProducer producer; + + private ProducerInstrumentation producerInstrumentation; + + private final RocketMQProducerProperties producerProperties; + + private final String destination; + + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + private final InstrumentationManager instrumentationManager; + + protected volatile boolean running = false; + + public RocketMQMessageHandler(String destination, RocketMQProducerProperties producerProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + InstrumentationManager instrumentationManager) { + this.destination = destination; + this.producerProperties = producerProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.instrumentationManager = instrumentationManager; + } + + @Override + public void start() { + producer = new DefaultMQProducer(destination); + + producerInstrumentation = instrumentationManager.getProducerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(producerInstrumentation); + + producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); + + if (producerProperties.getMaxMessageSize() > 0) { + producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + } + + try { + producer.start(); + producerInstrumentation.markStartedSuccessfully(); + } catch (MQClientException e) { + producerInstrumentation.markStartFailed(e); + logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage()); + throw new MessagingException(e.getMessage(), e); + } + running = true; + } + + @Override + public void stop() { + if (producer != null) { + producer.shutdown(); + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + protected void handleMessageInternal(org.springframework.messaging.Message message) throws Exception { + try { + Message toSend; + if (message.getPayload() instanceof byte[]) { + toSend = new Message(destination, (byte[])message.getPayload()); + } else if (message.getPayload() instanceof String) { + toSend = new Message(destination, ((String)message.getPayload()).getBytes()); + } else { + throw new UnsupportedOperationException( + "Payload class isn't supported: " + message.getPayload().getClass()); + } + RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); + headerAccessor.setLeaveMutable(true); + toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel()); + toSend.setTags(headerAccessor.getTags()); + toSend.setKeys(headerAccessor.getKeys()); + toSend.setFlag(headerAccessor.getFlag()); + for (Map.Entry entry : headerAccessor.getUserProperties().entrySet()) { + toSend.putUserProperty(entry.getKey(), entry.getValue()); + } + + SendResult sendRes = producer.send(toSend); + + if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { + throw new MQClientException("message hasn't been sent", null); + } + if (message instanceof MutableMessage) { + RocketMQMessageHeaderAccessor.putSendResult((MutableMessage)message, sendRes); + } + instrumentationManager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP, + Instant.now().toEpochMilli()); + producerInstrumentation.markSent(); + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | + UnsupportedOperationException e) { + producerInstrumentation.markSentFailure(); + logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage()); + throw new MessagingException(e.getMessage(), e); + } + + } + +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java new file mode 100644 index 000000000..5b3f9efa9 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java @@ -0,0 +1,34 @@ +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import java.util.concurrent.atomic.AtomicBoolean; + +import com.codahale.metrics.MetricRegistry; + +/** + * @author Timur Valiev + * @author Jim + */ +public class ConsumerGroupInstrumentation extends Instrumentation { + private MetricRegistry metricRegistry; + + private AtomicBoolean delayedStart = new AtomicBoolean(false); + + public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { + super(name); + this.metricRegistry = metricRegistry; + } + + public void markDelayedStart() { + delayedStart.set(true); + } + + @Override + public boolean isUp() { + return started.get() || delayedStart.get(); + } + + @Override + public boolean isOutOfService() { + return !started.get() && startException == null && !delayedStart.get(); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java new file mode 100644 index 000000000..7edc2b5f7 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java @@ -0,0 +1,37 @@ +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * @author juven.xuxb + * @author Jim + */ +public class ConsumerInstrumentation extends Instrumentation { + + private final Counter totalConsumed; + private final Counter totalConsumedFailures; + private final Meter consumedPerSecond; + private final Meter consumedFailuresPerSecond; + + public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { + super(baseMetricName); + this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed")); + this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond")); + this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures")); + this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond")); + } + + public void markConsumed() { + totalConsumed.inc(); + consumedPerSecond.mark(); + } + + public void markConsumedFailure() { + totalConsumedFailures.inc(); + consumedFailuresPerSecond.mark(); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java new file mode 100644 index 000000000..d91c15835 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java @@ -0,0 +1,50 @@ +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Timur Valiev + * @author Jim + */ +public class Instrumentation { + private final String name; + protected final AtomicBoolean started = new AtomicBoolean(false); + protected Exception startException = null; + + Instrumentation(String name) { + this.name = name; + } + + public boolean isDown() { + return startException != null; + } + + public boolean isUp() { + return started.get(); + } + + public boolean isOutOfService() { + return !started.get() && startException == null; + } + + public void markStartedSuccessfully() { + started.set(true); + } + + public void markStartFailed(Exception e) { + started.set(false); + startException = e; + } + + public String getName() { + return name; + } + + public boolean isStarted() { + return started.get(); + } + + public Exception getStartException() { + return startException; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java new file mode 100644 index 000000000..0e5d84a36 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -0,0 +1,57 @@ +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import com.codahale.metrics.MetricRegistry; + +/** + * @author Timur Valiev + * @author Jim + */ +public class InstrumentationManager { + private final MetricRegistry metricRegistry; + private final Map runtime; + private final Map producerInstrumentations = new HashMap<>(); + private final Map consumeInstrumentations = new HashMap<>(); + private final Map consumerGroupsInstrumentations = new HashMap<>(); + + private final Map healthInstrumentations = new HashMap<>(); + + public InstrumentationManager(MetricRegistry metricRegistry, Map runtime) { + this.metricRegistry = metricRegistry; + this.runtime = runtime; + } + + public ProducerInstrumentation getProducerInstrumentation(String destination) { + String key = "scs-rocketmq.producer." + destination; + producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key)); + return producerInstrumentations.get(key); + } + + public ConsumerInstrumentation getConsumerInstrumentation(String destination) { + String key = "scs-rocketmq.consumer." + destination; + consumeInstrumentations.putIfAbsent(key, new ConsumerInstrumentation(metricRegistry, key)); + return consumeInstrumentations.get(key); + } + + public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { + String key = "scs-rocketmq.consumerGroup." + group; + consumerGroupsInstrumentations.putIfAbsent(key, new ConsumerGroupInstrumentation(metricRegistry, key)); + return consumerGroupsInstrumentations.get(key); + } + + public Set getHealthInstrumentations() { + return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet()); + } + + public void addHealthInstrumentation(Instrumentation instrumentation) { + healthInstrumentations.put(instrumentation.getName(), instrumentation); + } + + public Map getRuntime() { + return runtime; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java new file mode 100644 index 000000000..7444d9ce7 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java @@ -0,0 +1,37 @@ +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * @author juven.xuxb + * @author Jim + */ +public class ProducerInstrumentation extends Instrumentation { + + private final Counter totalSent; + private final Counter totalSentFailures; + private final Meter sentPerSecond; + private final Meter sentFailuresPerSecond; + + public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { + super(baseMetricName); + this.totalSent = registry.counter(name(baseMetricName, "totalSent")); + this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures")); + this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond")); + this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond")); + } + + public void markSent() { + totalSent.inc(); + sentPerSecond.mark(); + } + + public void markSentFailure() { + totalSentFailures.inc(); + sentFailuresPerSecond.mark(); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java new file mode 100644 index 000000000..b960bc320 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -0,0 +1,32 @@ +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author Timur Valiev + * @author Jim + */ +@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") +public class RocketMQBinderConfigurationProperties { + + private String namesrvAddr; + + private String logLevel = "ERROR"; + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java new file mode 100644 index 000000000..4cefbd393 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java @@ -0,0 +1,28 @@ +package org.springframework.cloud.stream.binder.rocketmq.properties; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQBindingProperties { + + private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties(); + + private RocketMQProducerProperties producer = new RocketMQProducerProperties(); + + public RocketMQConsumerProperties getConsumer() { + return consumer; + } + + public void setConsumer(RocketMQConsumerProperties consumer) { + this.consumer = consumer; + } + + public RocketMQProducerProperties getProducer() { + return producer; + } + + public void setProducer(RocketMQProducerProperties producer) { + this.producer = producer; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java new file mode 100644 index 000000000..410914acd --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -0,0 +1,66 @@ +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQConsumerProperties { + + /** + * using '||' to split tag + * {@link MQPushConsumer#subscribe(String, String)} + */ + private String tags; + + /** + * {@link MQPushConsumer#subscribe(String, MessageSelector)} + * {@link MessageSelector#bySql(String)} + */ + private String sql; + + /** + * if orderly is true, using {@link MessageListenerOrderly} + * else if orderly if false, using {@link MessageListenerConcurrently} + */ + private Boolean orderly = false; + + private Boolean enabled = true; + + public String getTags() { + return tags; + } + + public void setTags(String tags) { + this.tags = tags; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public Boolean getOrderly() { + return orderly; + } + + public void setOrderly(Boolean orderly) { + this.orderly = orderly; + } + + public Boolean getEnabled() { + return enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java new file mode 100644 index 000000000..8f3a30ef7 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java @@ -0,0 +1,64 @@ +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.stream.binder.ExtendedBindingProperties; + +/** + * @author Timur Valiev + * @author Jim + */ +@ConfigurationProperties("spring.cloud.stream.rocketmq") +public class RocketMQExtendedBindingProperties implements + ExtendedBindingProperties { + + private Map bindings = new HashMap<>(); + + public Map getBindings() { + return this.bindings; + } + + public void setBindings(Map bindings) { + this.bindings = bindings; + } + + @Override + public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { + if (bindings.containsKey(channelName)) { + if (bindings.get(channelName).getConsumer() != null) { + return bindings.get(channelName).getConsumer(); + } else { + RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); + this.bindings.get(channelName).setConsumer(properties); + return properties; + } + } else { + RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); + RocketMQBindingProperties rbp = new RocketMQBindingProperties(); + rbp.setConsumer(properties); + bindings.put(channelName, rbp); + return properties; + } + } + + @Override + public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) { + if (bindings.containsKey(channelName)) { + if (bindings.get(channelName).getProducer() != null) { + return bindings.get(channelName).getProducer(); + } else { + RocketMQProducerProperties properties = new RocketMQProducerProperties(); + this.bindings.get(channelName).setProducer(properties); + return properties; + } + } else { + RocketMQProducerProperties properties = new RocketMQProducerProperties(); + RocketMQBindingProperties rbp = new RocketMQBindingProperties(); + rbp.setProducer(properties); + bindings.put(channelName, rbp); + return properties; + } + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java new file mode 100644 index 000000000..4daed981b --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -0,0 +1,35 @@ +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQProducerProperties { + + private Boolean enabled = true; + + /** + * Maximum allowed message size in bytes + * {@link DefaultMQProducer#maxMessageSize} + */ + private Integer maxMessageSize = 0; + + public Boolean getEnabled() { + return enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Integer getMaxMessageSize() { + return maxMessageSize; + } + + public void setMaxMessageSize(Integer maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java new file mode 100644 index 000000000..b6a688efe --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java @@ -0,0 +1,89 @@ +package org.springframework.cloud.stream.binder.rocketmq.provisioning; + +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.provisioning.ProvisioningException; +import org.springframework.cloud.stream.provisioning.ProvisioningProvider; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQTopicProvisioner + implements + ProvisioningProvider, + ExtendedProducerProperties> { + + private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class); + + @Override + public ProducerDestination provisionProducerDestination(String name, + ExtendedProducerProperties + properties) + throws ProvisioningException { + checkTopic(name); + return new RocketProducerDestination(name); + } + + @Override + public ConsumerDestination provisionConsumerDestination(String name, String group, + ExtendedConsumerProperties + properties) + throws ProvisioningException { + checkTopic(name); + return new RocketConsumerDestination(name); + } + + private void checkTopic(String topic) { + try { + Validators.checkTopic(topic); + } catch (MQClientException e) { + logger.error("topic check error: " + topic, e); + throw new AssertionError(e); // Can't happen + } + } + + private static final class RocketProducerDestination implements ProducerDestination { + + private final String producerDestinationName; + + RocketProducerDestination(String destinationName) { + this.producerDestinationName = destinationName; + } + + @Override + public String getName() { + return producerDestinationName; + } + + @Override + public String getNameForPartition(int partition) { + return producerDestinationName; + } + + } + + private static final class RocketConsumerDestination implements ConsumerDestination { + + private final String consumerDestinationName; + + RocketConsumerDestination(String consumerDestinationName) { + this.consumerDestinationName = consumerDestinationName; + } + + @Override + public String getName() { + return this.consumerDestinationName; + } + + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders new file mode 100644 index 000000000..78d86cf92 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders @@ -0,0 +1 @@ +rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..43b85129a --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration