diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml
index 5c793f8ae..1afc1ab97 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml
@@ -6,7 +6,6 @@
org.springframework.cloud
spring-cloud-alibaba-examples
0.2.1.BUILD-SNAPSHOT
- ../../pom.xml
4.0.0
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index da5cf1c75..cb3d5cad4 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -26,76 +26,84 @@ import org.springframework.messaging.MessageHandler;
* @author Jim
*/
public class RocketMQMessageChannelBinder extends
- AbstractMessageChannelBinder,
- ExtendedProducerProperties, RocketMQTopicProvisioner>
- implements ExtendedPropertiesBinder {
+ AbstractMessageChannelBinder, ExtendedProducerProperties, RocketMQTopicProvisioner>
+ implements
+ ExtendedPropertiesBinder {
- private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
+ private static final Logger logger = LoggerFactory
+ .getLogger(RocketMQMessageChannelBinder.class);
- private final RocketMQExtendedBindingProperties extendedBindingProperties;
- private final RocketMQTopicProvisioner rocketTopicProvisioner;
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
- private final InstrumentationManager instrumentationManager;
- private final ConsumersManager consumersManager;
+ private final RocketMQExtendedBindingProperties extendedBindingProperties;
+ private final RocketMQTopicProvisioner rocketTopicProvisioner;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+ private final InstrumentationManager instrumentationManager;
+ private final ConsumersManager consumersManager;
- public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
- RocketMQExtendedBindingProperties extendedBindingProperties,
- RocketMQTopicProvisioner provisioningProvider,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- InstrumentationManager instrumentationManager) {
- super(null, provisioningProvider);
- this.consumersManager = consumersManager;
- this.extendedBindingProperties = extendedBindingProperties;
- this.rocketTopicProvisioner = provisioningProvider;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.instrumentationManager = instrumentationManager;
- }
+ public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
+ RocketMQExtendedBindingProperties extendedBindingProperties,
+ RocketMQTopicProvisioner provisioningProvider,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ InstrumentationManager instrumentationManager) {
+ super(null, provisioningProvider);
+ this.consumersManager = consumersManager;
+ this.extendedBindingProperties = extendedBindingProperties;
+ this.rocketTopicProvisioner = provisioningProvider;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.instrumentationManager = instrumentationManager;
+ }
- @Override
- protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
- ExtendedProducerProperties
- producerProperties,
- MessageChannel errorChannel) throws Exception {
- if (producerProperties.getExtension().getEnabled()) {
- return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
- rocketBinderConfigurationProperties, instrumentationManager);
- } else {
- throw new RuntimeException(
- "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
- }
- }
+ @Override
+ protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
+ ExtendedProducerProperties producerProperties,
+ MessageChannel errorChannel) throws Exception {
+ if (producerProperties.getExtension().getEnabled()) {
+ return new RocketMQMessageHandler(destination.getName(),
+ producerProperties.getExtension(),
+ rocketBinderConfigurationProperties, instrumentationManager);
+ }
+ else {
+ throw new RuntimeException("Binding for channel " + destination.getName()
+ + "has been disabled, message can't be delivered");
+ }
+ }
- @Override
- protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
- ExtendedConsumerProperties
- consumerProperties)
- throws Exception {
- if (group == null || "".equals(group)) {
- throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
- }
+ @Override
+ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
+ String group,
+ ExtendedConsumerProperties consumerProperties)
+ throws Exception {
+ if (group == null || "".equals(group)) {
+ throw new RuntimeException(
+ "'group' must be configured for channel + " + destination.getName());
+ }
- RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
- consumerProperties, destination.getName(), group, instrumentationManager);
+ RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
+ consumersManager, consumerProperties, destination.getName(), group,
+ instrumentationManager);
- ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
- consumerProperties);
- if (consumerProperties.getMaxAttempts() > 1) {
- rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
- rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
- } else {
- rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
- }
+ ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
+ group, consumerProperties);
+ if (consumerProperties.getMaxAttempts() > 1) {
+ rocketInboundChannelAdapter
+ .setRetryTemplate(buildRetryTemplate(consumerProperties));
+ rocketInboundChannelAdapter
+ .setRecoveryCallback(errorInfrastructure.getRecoverer());
+ }
+ else {
+ rocketInboundChannelAdapter
+ .setErrorChannel(errorInfrastructure.getErrorChannel());
+ }
- return rocketInboundChannelAdapter;
- }
+ return rocketInboundChannelAdapter;
+ }
- @Override
- public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
- return extendedBindingProperties.getExtendedConsumerProperties(channelName);
- }
+ @Override
+ public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
+ return extendedBindingProperties.getExtendedConsumerProperties(channelName);
+ }
- @Override
- public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
- return extendedBindingProperties.getExtendedProducerProperties(channelName);
- }
+ @Override
+ public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
+ return extendedBindingProperties.getExtendedProducerProperties(channelName);
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
index 480ec1941..e746933b1 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
@@ -1,5 +1,10 @@
package org.springframework.cloud.stream.binder.rocketmq;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
+
import java.util.HashMap;
import java.util.Map;
@@ -12,95 +17,93 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
-import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
-import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
-import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
-import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
-
/**
* @author Timur Valiev
* @author Jim
*/
public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
- public RocketMQMessageHeaderAccessor() {
- super();
- }
-
- public RocketMQMessageHeaderAccessor(Message> message) {
- super(message);
- }
-
- public Acknowledgement getAcknowledgement(Message message) {
- return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
- }
-
- public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
- setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
- return this;
- }
-
- public String getTags() {
- return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
- }
-
- public RocketMQMessageHeaderAccessor withTags(String tag) {
- setHeader(MessageConst.PROPERTY_TAGS, tag);
- return this;
- }
-
- public String getKeys() {
- return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
- }
-
- public RocketMQMessageHeaderAccessor withKeys(String keys) {
- setHeader(MessageConst.PROPERTY_KEYS, keys);
- return this;
- }
-
- public MessageExt getRocketMessage() {
- return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
- }
-
- public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
- setHeader(ORIGINAL_ROCKET_MESSAGE, message);
- return this;
- }
-
- public Integer getDelayTimeLevel() {
- return (Integer)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
- }
-
- public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
- setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
- return this;
- }
-
- public Integer getFlag() {
- return (Integer)getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
- }
-
- public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
- setHeader(ROCKET_FLAG, delayTimeLevel);
- return this;
- }
-
- public SendResult getSendResult() {
- return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
- }
-
- public static void putSendResult(MutableMessage message, SendResult sendResult) {
- message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
- }
-
- public Map getUserProperties() {
- Map result = new HashMap<>();
- for (Map.Entry entry : this.toMap().entrySet()) {
- if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
- .getKey().equals(MessageHeaders.CONTENT_TYPE)) {
- result.put(entry.getKey(), (String)entry.getValue());
- }
- }
- return result;
- }
+ public RocketMQMessageHeaderAccessor() {
+ super();
+ }
+
+ public RocketMQMessageHeaderAccessor(Message> message) {
+ super(message);
+ }
+
+ public Acknowledgement getAcknowledgement(Message message) {
+ return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
+ }
+
+ public RocketMQMessageHeaderAccessor withAcknowledgment(
+ Acknowledgement acknowledgment) {
+ setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
+ return this;
+ }
+
+ public String getTags() {
+ return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
+ }
+
+ public RocketMQMessageHeaderAccessor withTags(String tag) {
+ setHeader(MessageConst.PROPERTY_TAGS, tag);
+ return this;
+ }
+
+ public String getKeys() {
+ return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
+ }
+
+ public RocketMQMessageHeaderAccessor withKeys(String keys) {
+ setHeader(MessageConst.PROPERTY_KEYS, keys);
+ return this;
+ }
+
+ public MessageExt getRocketMessage() {
+ return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
+ }
+
+ public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
+ setHeader(ORIGINAL_ROCKET_MESSAGE, message);
+ return this;
+ }
+
+ public Integer getDelayTimeLevel() {
+ return (Integer) getMessageHeaders()
+ .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
+ }
+
+ public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
+ setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
+ return this;
+ }
+
+ public Integer getFlag() {
+ return (Integer) getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
+ }
+
+ public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
+ setHeader(ROCKET_FLAG, delayTimeLevel);
+ return this;
+ }
+
+ public SendResult getSendResult() {
+ return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
+ }
+
+ public static void putSendResult(MutableMessage message, SendResult sendResult) {
+ message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
+ }
+
+ public Map getUserProperties() {
+ Map result = new HashMap<>();
+ for (Map.Entry entry : this.toMap().entrySet()) {
+ if (entry.getValue() instanceof String
+ && !MessageConst.STRING_HASH_SET.contains(entry.getKey())
+ && !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
+ result.put(entry.getKey(), (String) entry.getValue());
+ }
+ }
+ return result;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
index 43303a0f2..5a7f57f53 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
@@ -1,14 +1,15 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
+
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import com.codahale.metrics.MetricRegistry;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
-import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
+import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev
@@ -17,23 +18,23 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
@Endpoint(id = ENDPOINT_ID)
public class RocketMQBinderEndpoint {
- private MetricRegistry metricRegistry = new MetricRegistry();
- private Map runtime = new ConcurrentHashMap<>();
+ private MetricRegistry metricRegistry = new MetricRegistry();
+ private Map runtime = new ConcurrentHashMap<>();
- @ReadOperation
- public Map invoke() {
- Map result = new HashMap<>();
- result.put("metrics", metricRegistry().getMetrics());
- result.put("runtime", runtime());
- return result;
- }
+ @ReadOperation
+ public Map invoke() {
+ Map result = new HashMap<>();
+ result.put("metrics", metricRegistry().getMetrics());
+ result.put("runtime", runtime());
+ return result;
+ }
- public MetricRegistry metricRegistry() {
- return metricRegistry;
- }
+ public MetricRegistry metricRegistry() {
+ return metricRegistry;
+ }
- public Map runtime() {
- return runtime;
- }
+ public Map runtime() {
+ return runtime;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
index 7a67902d6..82f4ab5ff 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
@@ -11,27 +11,28 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
*/
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
- private final InstrumentationManager instrumentationManager;
+ private final InstrumentationManager instrumentationManager;
- public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
- this.instrumentationManager = instrumentationManager;
- }
+ public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
+ this.instrumentationManager = instrumentationManager;
+ }
- @Override
- protected void doHealthCheck(Health.Builder builder) throws Exception {
- if (instrumentationManager.getHealthInstrumentations().stream().
- allMatch(Instrumentation::isUp)) {
- builder.up();
- return;
- }
- if (instrumentationManager.getHealthInstrumentations().stream().
- allMatch(Instrumentation::isOutOfService)) {
- builder.outOfService();
- return;
- }
- builder.down();
- instrumentationManager.getHealthInstrumentations().stream().
- filter(instrumentation -> !instrumentation.isStarted()).
- forEach(instrumentation1 -> builder.withException(instrumentation1.getStartException()));
- }
+ @Override
+ protected void doHealthCheck(Health.Builder builder) throws Exception {
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isUp)) {
+ builder.up();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isOutOfService)) {
+ builder.outOfService();
+ return;
+ }
+ builder.down();
+ instrumentationManager.getHealthInstrumentations().stream()
+ .filter(instrumentation -> !instrumentation.isStarted())
+ .forEach(instrumentation1 -> builder
+ .withException(instrumentation1.getStartException()));
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
index e1b289661..713edb5cf 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
@@ -17,38 +17,45 @@ import org.springframework.context.annotation.Configuration;
* @author Jim
*/
@Configuration
-@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
+@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
+ RocketMQExtendedBindingProperties.class })
public class RocketMQBinderAutoConfiguration {
- private final RocketMQExtendedBindingProperties extendedBindingProperties;
-
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- @Autowired
- public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
- this.extendedBindingProperties = extendedBindingProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
- }
-
- @Bean
- public RocketMQTopicProvisioner provisioningProvider() {
- return new RocketMQTopicProvisioner();
- }
-
- @Bean
- public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
- InstrumentationManager instrumentationManager,
- ConsumersManager consumersManager) {
- RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
- provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
- return binder;
- }
-
- @Bean
- public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
- return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
- }
+ private final RocketMQExtendedBindingProperties extendedBindingProperties;
+
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ @Autowired
+ public RocketMQBinderAutoConfiguration(
+ RocketMQExtendedBindingProperties extendedBindingProperties,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
+ this.extendedBindingProperties = extendedBindingProperties;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ System.setProperty(ClientLogger.CLIENT_LOG_LEVEL,
+ this.rocketBinderConfigurationProperties.getLogLevel());
+ }
+
+ @Bean
+ public RocketMQTopicProvisioner provisioningProvider() {
+ return new RocketMQTopicProvisioner();
+ }
+
+ @Bean
+ public RocketMQMessageChannelBinder rocketMessageChannelBinder(
+ RocketMQTopicProvisioner provisioningProvider,
+ InstrumentationManager instrumentationManager,
+ ConsumersManager consumersManager) {
+ RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
+ consumersManager, extendedBindingProperties, provisioningProvider,
+ rocketBinderConfigurationProperties, instrumentationManager);
+ return binder;
+ }
+
+ @Bean
+ public ConsumersManager consumersManager(
+ InstrumentationManager instrumentationManager) {
+ return new ConsumersManager(instrumentationManager,
+ rocketBinderConfigurationProperties);
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
index 0df4f51c5..1b0bd0f51 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
@@ -15,19 +15,22 @@ import org.springframework.context.annotation.Configuration;
@AutoConfigureAfter(EndpointAutoConfiguration.class)
public class RocketMQBinderEndpointAutoConfiguration {
- @Bean
- public RocketMQBinderEndpoint rocketBinderEndpoint() {
- return new RocketMQBinderEndpoint();
- }
+ @Bean
+ public RocketMQBinderEndpoint rocketBinderEndpoint() {
+ return new RocketMQBinderEndpoint();
+ }
- @Bean
- public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(InstrumentationManager instrumentationManager) {
- return new RocketMQBinderHealthIndicator(instrumentationManager);
- }
+ @Bean
+ public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(
+ InstrumentationManager instrumentationManager) {
+ return new RocketMQBinderHealthIndicator(instrumentationManager);
+ }
- @Bean
- public InstrumentationManager instrumentationManager(RocketMQBinderEndpoint rocketBinderEndpoint) {
- return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), rocketBinderEndpoint.runtime());
- }
+ @Bean
+ public InstrumentationManager instrumentationManager(
+ RocketMQBinderEndpoint rocketBinderEndpoint) {
+ return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(),
+ rocketBinderEndpoint.runtime());
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java
index 675894742..1efe40e4c 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java
@@ -11,68 +11,72 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
*/
public class Acknowledgement {
- /**
- * for {@link ConsumeConcurrentlyContext} using
- */
- private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- /**
- * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency
- */
- private Integer consumeConcurrentlyDelayLevel = 0;
+ /**
+ * for {@link ConsumeConcurrentlyContext} using
+ */
+ private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ /**
+ * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency
+ */
+ private Integer consumeConcurrentlyDelayLevel = 0;
- /**
- * for {@link ConsumeOrderlyContext} using
- */
- private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
- private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
+ /**
+ * for {@link ConsumeOrderlyContext} using
+ */
+ private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
+ private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
- public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
- this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
- return this;
- }
+ public Acknowledgement setConsumeConcurrentlyStatus(
+ ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
+ this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
+ return this;
+ }
- public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
- return consumeConcurrentlyStatus;
- }
+ public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
+ return consumeConcurrentlyStatus;
+ }
- public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
- return consumeOrderlyStatus;
- }
+ public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
+ return consumeOrderlyStatus;
+ }
- public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) {
- this.consumeOrderlyStatus = consumeOrderlyStatus;
- return this;
- }
+ public Acknowledgement setConsumeOrderlyStatus(
+ ConsumeOrderlyStatus consumeOrderlyStatus) {
+ this.consumeOrderlyStatus = consumeOrderlyStatus;
+ return this;
+ }
- public Integer getConsumeConcurrentlyDelayLevel() {
- return consumeConcurrentlyDelayLevel;
- }
+ public Integer getConsumeConcurrentlyDelayLevel() {
+ return consumeConcurrentlyDelayLevel;
+ }
- public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
- this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
- }
+ public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
+ this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
+ }
- public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
- return consumeOrderlySuspendCurrentQueueTimeMill;
- }
+ public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
+ return consumeOrderlySuspendCurrentQueueTimeMill;
+ }
- public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
- this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
- }
+ public void setConsumeOrderlySuspendCurrentQueueTimeMill(
+ Long consumeOrderlySuspendCurrentQueueTimeMill) {
+ this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
+ }
- public static Acknowledgement buildOrderlyInstance() {
- Acknowledgement acknowledgement = new Acknowledgement();
- acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
- return acknowledgement;
- }
+ public static Acknowledgement buildOrderlyInstance() {
+ Acknowledgement acknowledgement = new Acknowledgement();
+ acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
+ return acknowledgement;
+ }
- public static Acknowledgement buildConcurrentlyInstance() {
- Acknowledgement acknowledgement = new Acknowledgement();
- acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
- return acknowledgement;
- }
+ public static Acknowledgement buildConcurrentlyInstance() {
+ Acknowledgement acknowledgement = new Acknowledgement();
+ acknowledgement
+ .setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
+ return acknowledgement;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
index 04606915d..4184e3f41 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
@@ -22,85 +22,88 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu
*/
public class ConsumersManager {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private final Map consumerGroups = new HashMap<>();
- private final Map started = new HashMap<>();
- private final Map, ExtendedConsumerProperties> propertiesMap
- = new HashMap<>();
- private final InstrumentationManager instrumentationManager;
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- public ConsumersManager(InstrumentationManager instrumentationManager,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
- this.instrumentationManager = instrumentationManager;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- }
-
- public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
- ExtendedConsumerProperties consumerProperties) {
- propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
- ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
- instrumentationManager.addHealthInstrumentation(instrumentation);
-
- if (consumerGroups.containsKey(group)) {
- return consumerGroups.get(group);
- }
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
- consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
- consumerGroups.put(group, consumer);
- started.put(group, false);
- consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
- consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
- if (consumerProperties.getExtension().getBroadcasting()) {
- consumer.setMessageModel(MessageModel.BROADCASTING);
- }
- logger.info("RocketMQ consuming for SCS group {} created", group);
- return consumer;
- }
-
- public synchronized void startConsumers() throws MQClientException {
- for (String group : getConsumerGroups()) {
- start(group);
- }
- }
-
- public synchronized void startConsumer(String group) throws MQClientException {
- start(group);
- }
-
- public synchronized void stopConsumer(String group) {
- stop(group);
- }
-
- private void stop(String group) {
- if (consumerGroups.get(group) != null) {
- consumerGroups.get(group).shutdown();
- started.put(group, false);
- }
- }
-
- private synchronized void start(String group) throws MQClientException {
- if (started.get(group)) {
- return;
- }
- ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
- group);
- instrumentationManager.addHealthInstrumentation(groupInstrumentation);
- try {
- consumerGroups.get(group).start();
- started.put(group, true);
- groupInstrumentation.markStartedSuccessfully();
- } catch (MQClientException e) {
- groupInstrumentation.markStartFailed(e);
- logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
- throw e;
- }
- }
-
- public synchronized Set getConsumerGroups() {
- return consumerGroups.keySet();
- }
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final Map consumerGroups = new HashMap<>();
+ private final Map started = new HashMap<>();
+ private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>();
+ private final InstrumentationManager instrumentationManager;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ public ConsumersManager(InstrumentationManager instrumentationManager,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
+ this.instrumentationManager = instrumentationManager;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ }
+
+ public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group,
+ String topic,
+ ExtendedConsumerProperties consumerProperties) {
+ propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
+ consumerProperties);
+ ConsumerGroupInstrumentation instrumentation = instrumentationManager
+ .getConsumerGroupInstrumentation(group);
+ instrumentationManager.addHealthInstrumentation(instrumentation);
+
+ if (consumerGroups.containsKey(group)) {
+ return consumerGroups.get(group);
+ }
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+ consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
+ consumerGroups.put(group, consumer);
+ started.put(group, false);
+ consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
+ consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
+ if (consumerProperties.getExtension().getBroadcasting()) {
+ consumer.setMessageModel(MessageModel.BROADCASTING);
+ }
+ logger.info("RocketMQ consuming for SCS group {} created", group);
+ return consumer;
+ }
+
+ public synchronized void startConsumers() throws MQClientException {
+ for (String group : getConsumerGroups()) {
+ start(group);
+ }
+ }
+
+ public synchronized void startConsumer(String group) throws MQClientException {
+ start(group);
+ }
+
+ public synchronized void stopConsumer(String group) {
+ stop(group);
+ }
+
+ private void stop(String group) {
+ if (consumerGroups.get(group) != null) {
+ consumerGroups.get(group).shutdown();
+ started.put(group, false);
+ }
+ }
+
+ private synchronized void start(String group) throws MQClientException {
+ if (started.get(group)) {
+ return;
+ }
+ ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager
+ .getConsumerGroupInstrumentation(group);
+ instrumentationManager.addHealthInstrumentation(groupInstrumentation);
+ try {
+ consumerGroups.get(group).start();
+ started.put(group, true);
+ groupInstrumentation.markStartedSuccessfully();
+ }
+ catch (MQClientException e) {
+ groupInstrumentation.markStartFailed(e);
+ logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ + e.getErrorMessage(), e);
+ throw e;
+ }
+ }
+
+ public synchronized Set getConsumerGroups() {
+ return consumerGroups.keySet();
+ }
}
-
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
index 8f3ddacb6..6b22c400c 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
@@ -43,211 +43,249 @@ import org.springframework.util.StringUtils;
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
- private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
-
- private ConsumerInstrumentation consumerInstrumentation;
-
- private final ExtendedConsumerProperties consumerProperties;
-
- private final String destination;
-
- private final String group;
-
- private final InstrumentationManager instrumentationManager;
-
- private final ConsumersManager consumersManager;
-
- private RetryTemplate retryTemplate;
-
- private RecoveryCallback extends Object> recoveryCallback;
-
- public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
- ExtendedConsumerProperties consumerProperties,
- String destination, String group,
- InstrumentationManager instrumentationManager) {
- this.consumersManager = consumersManager;
- this.consumerProperties = consumerProperties;
- this.destination = destination;
- this.group = group;
- this.instrumentationManager = instrumentationManager;
- }
-
- @Override
- protected void doStart() {
- if (!consumerProperties.getExtension().getEnabled()) {
- return;
- }
-
- String tags = consumerProperties == null ? null : consumerProperties.getExtension().getTags();
- Boolean isOrderly = consumerProperties == null ? false : consumerProperties.getExtension().getOrderly();
-
- DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
-
- final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
- instrumentationManager)
- : new CloudStreamMessageListenerConcurrently(instrumentationManager);
-
- if (retryTemplate != null) {
- retryTemplate.registerListener(listener);
- }
-
- Set tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
- .collect(Collectors.toSet());
-
- consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
- instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
-
- try {
- if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
- consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
- } else {
- consumer.subscribe(destination, String.join(" || ", tagsSet));
- }
- consumerInstrumentation.markStartedSuccessfully();
- } catch (MQClientException e) {
- consumerInstrumentation.markStartFailed(e);
- logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
- throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
- }
-
- consumer.registerMessageListener(listener);
-
- try {
- consumersManager.startConsumer(group);
- } catch (MQClientException e) {
- logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), e);
- throw new RuntimeException("RocketMQ Consumer startup failed.", e);
- }
- }
-
- @Override
- protected void doStop() {
- consumersManager.stopConsumer(group);
- }
-
- public void setRetryTemplate(RetryTemplate retryTemplate) {
- this.retryTemplate = retryTemplate;
- }
-
- public void setRecoveryCallback(RecoveryCallback extends Object> recoveryCallback) {
- this.recoveryCallback = recoveryCallback;
- }
-
- protected class CloudStreamMessageListener implements MessageListener, RetryListener {
-
- private final InstrumentationManager instrumentationManager;
-
- CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
- this.instrumentationManager = instrumentationManager;
- }
-
- Acknowledgement consumeMessage(final List msgs) {
- boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
- try {
- if (enableRetry) {
- return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
- (RetryCallback)context -> doSendMsgs(msgs, context),
- new RecoveryCallback() {
- @Override
- public Acknowledgement recover(RetryContext context) throws Exception {
- RocketMQInboundChannelAdapter.this.recoveryCallback.recover(context);
- if (ClassUtils.isAssignable(this.getClass(), MessageListenerConcurrently.class)) {
- return Acknowledgement.buildConcurrentlyInstance();
- } else {
- return Acknowledgement.buildOrderlyInstance();
- }
- }
- });
- } else {
- Acknowledgement result = doSendMsgs(msgs, null);
- instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
- .markConsumed();
- return result;
- }
- } catch (Exception e) {
- logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
- instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
- .markConsumedFailure();
- throw new RuntimeException("Rocket Message hasn't been processed successfully. Caused by ", e);
- }
- }
-
- private Acknowledgement doSendMsgs(final List msgs, RetryContext context) {
- List acknowledgements = new ArrayList<>();
- msgs.forEach(msg -> {
- String retryInfo = context == null ? "" : "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
- logger.debug(retryInfo + "consuming msg:\n" + msg);
- logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
- Acknowledgement acknowledgement = new Acknowledgement();
- Message toChannel = MessageBuilder.withPayload(msg.getBody()).
- setHeaders(new RocketMQMessageHeaderAccessor().
- withAcknowledgment(acknowledgement).
- withTags(msg.getTags()).
- withKeys(msg.getKeys()).
- withFlag(msg.getFlag()).
- withRocketMessage(msg)
- ).build();
- acknowledgements.add(acknowledgement);
- RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
- });
- return acknowledgements.get(0);
- }
-
- @Override
- public boolean open(RetryContext context, RetryCallback callback) {
- return true;
- }
-
- @Override
- public void close(RetryContext context, RetryCallback callback,
- Throwable throwable) {
- if (throwable != null) {
- instrumentationManager.getConsumerInstrumentation(
- RocketMQInboundChannelAdapter.this.destination)
- .markConsumedFailure();
- } else {
- instrumentationManager.getConsumerInstrumentation(
- RocketMQInboundChannelAdapter.this.destination)
- .markConsumed();
- }
- }
-
- @Override
- public void onError(RetryContext context, RetryCallback callback,
- Throwable throwable) {
- }
- }
-
- protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
- MessageListenerConcurrently {
-
- public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager) {
- super(instrumentationManager);
- }
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(final List msgs,
- ConsumeConcurrentlyContext context) {
- Acknowledgement acknowledgement = consumeMessage(msgs);
- context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
- return acknowledgement.getConsumeConcurrentlyStatus();
- }
- }
-
- protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements
- MessageListenerOrderly {
-
- public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager) {
- super(instrumentationManager);
- }
-
- @Override
- public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
- Acknowledgement acknowledgement = consumeMessage(msgs);
- context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
- return acknowledgement.getConsumeOrderlyStatus();
- }
-
- }
+ private static final Logger logger = LoggerFactory
+ .getLogger(RocketMQInboundChannelAdapter.class);
+
+ private ConsumerInstrumentation consumerInstrumentation;
+
+ private final ExtendedConsumerProperties consumerProperties;
+
+ private final String destination;
+
+ private final String group;
+
+ private final InstrumentationManager instrumentationManager;
+
+ private final ConsumersManager consumersManager;
+
+ private RetryTemplate retryTemplate;
+
+ private RecoveryCallback extends Object> recoveryCallback;
+
+ public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
+ ExtendedConsumerProperties consumerProperties,
+ String destination, String group,
+ InstrumentationManager instrumentationManager) {
+ this.consumersManager = consumersManager;
+ this.consumerProperties = consumerProperties;
+ this.destination = destination;
+ this.group = group;
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ protected void doStart() {
+ if (!consumerProperties.getExtension().getEnabled()) {
+ return;
+ }
+
+ String tags = consumerProperties == null ? null
+ : consumerProperties.getExtension().getTags();
+ Boolean isOrderly = consumerProperties == null ? false
+ : consumerProperties.getExtension().getOrderly();
+
+ DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
+ destination, consumerProperties);
+
+ final CloudStreamMessageListener listener = isOrderly
+ ? new CloudStreamMessageListenerOrderly(instrumentationManager)
+ : new CloudStreamMessageListenerConcurrently(instrumentationManager);
+
+ if (retryTemplate != null) {
+ retryTemplate.registerListener(listener);
+ }
+
+ Set tagsSet = tags == null ? new HashSet<>()
+ : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
+ .collect(Collectors.toSet());
+
+ consumerInstrumentation = instrumentationManager
+ .getConsumerInstrumentation(destination);
+ instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
+
+ try {
+ if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
+ consumer.subscribe(destination, MessageSelector
+ .bySql(consumerProperties.getExtension().getSql()));
+ }
+ else {
+ consumer.subscribe(destination, String.join(" || ", tagsSet));
+ }
+ consumerInstrumentation.markStartedSuccessfully();
+ }
+ catch (MQClientException e) {
+ consumerInstrumentation.markStartFailed(e);
+ logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
+ + e.getErrorMessage(), e);
+ throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
+ }
+
+ consumer.registerMessageListener(listener);
+
+ try {
+ consumersManager.startConsumer(group);
+ }
+ catch (MQClientException e) {
+ logger.error(
+ "RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(),
+ e);
+ throw new RuntimeException("RocketMQ Consumer startup failed.", e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ consumersManager.stopConsumer(group);
+ }
+
+ public void setRetryTemplate(RetryTemplate retryTemplate) {
+ this.retryTemplate = retryTemplate;
+ }
+
+ public void setRecoveryCallback(RecoveryCallback extends Object> recoveryCallback) {
+ this.recoveryCallback = recoveryCallback;
+ }
+
+ protected class CloudStreamMessageListener implements MessageListener, RetryListener {
+
+ private final InstrumentationManager instrumentationManager;
+
+ CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ Acknowledgement consumeMessage(final List msgs) {
+ boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
+ try {
+ if (enableRetry) {
+ return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
+ (RetryCallback) context -> doSendMsgs(
+ msgs, context),
+ new RecoveryCallback() {
+ @Override
+ public Acknowledgement recover(RetryContext context)
+ throws Exception {
+ RocketMQInboundChannelAdapter.this.recoveryCallback
+ .recover(context);
+ if (ClassUtils.isAssignable(this.getClass(),
+ MessageListenerConcurrently.class)) {
+ return Acknowledgement
+ .buildConcurrentlyInstance();
+ }
+ else {
+ return Acknowledgement.buildOrderlyInstance();
+ }
+ }
+ });
+ }
+ else {
+ Acknowledgement result = doSendMsgs(msgs, null);
+ instrumentationManager
+ .getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumed();
+ return result;
+ }
+ }
+ catch (Exception e) {
+ logger.error(
+ "Rocket Message hasn't been processed successfully. Caused by ",
+ e);
+ instrumentationManager
+ .getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumedFailure();
+ throw new RuntimeException(
+ "Rocket Message hasn't been processed successfully. Caused by ",
+ e);
+ }
+ }
+
+ private Acknowledgement doSendMsgs(final List msgs,
+ RetryContext context) {
+ List acknowledgements = new ArrayList<>();
+ msgs.forEach(msg -> {
+ String retryInfo = context == null ? ""
+ : "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
+ logger.debug(retryInfo + "consuming msg:\n" + msg);
+ logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
+ Acknowledgement acknowledgement = new Acknowledgement();
+ Message toChannel = MessageBuilder.withPayload(msg.getBody())
+ .setHeaders(new RocketMQMessageHeaderAccessor()
+ .withAcknowledgment(acknowledgement)
+ .withTags(msg.getTags()).withKeys(msg.getKeys())
+ .withFlag(msg.getFlag()).withRocketMessage(msg))
+ .build();
+ acknowledgements.add(acknowledgement);
+ RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
+ });
+ return acknowledgements.get(0);
+ }
+
+ @Override
+ public boolean open(RetryContext context,
+ RetryCallback callback) {
+ return true;
+ }
+
+ @Override
+ public void close(RetryContext context,
+ RetryCallback callback, Throwable throwable) {
+ if (throwable != null) {
+ instrumentationManager
+ .getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumedFailure();
+ }
+ else {
+ instrumentationManager
+ .getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumed();
+ }
+ }
+
+ @Override
+ public void onError(RetryContext context,
+ RetryCallback callback, Throwable throwable) {
+ }
+ }
+
+ protected class CloudStreamMessageListenerConcurrently
+ extends CloudStreamMessageListener implements MessageListenerConcurrently {
+
+ public CloudStreamMessageListenerConcurrently(
+ InstrumentationManager instrumentationManager) {
+ super(instrumentationManager);
+ }
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(final List msgs,
+ ConsumeConcurrentlyContext context) {
+ Acknowledgement acknowledgement = consumeMessage(msgs);
+ context.setDelayLevelWhenNextConsume(
+ acknowledgement.getConsumeConcurrentlyDelayLevel());
+ return acknowledgement.getConsumeConcurrentlyStatus();
+ }
+ }
+
+ protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
+ implements MessageListenerOrderly {
+
+ public CloudStreamMessageListenerOrderly(
+ InstrumentationManager instrumentationManager) {
+ super(instrumentationManager);
+ }
+
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List msgs,
+ ConsumeOrderlyContext context) {
+ Acknowledgement acknowledgement = consumeMessage(msgs);
+ context.setSuspendCurrentQueueTimeMillis(
+ (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
+ return acknowledgement.getConsumeOrderlyStatus();
+ }
+
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index a70283fbe..fb92aba70 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -26,106 +26,120 @@ import org.springframework.messaging.MessagingException;
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
- private DefaultMQProducer producer;
-
- private ProducerInstrumentation producerInstrumentation;
-
- private final RocketMQProducerProperties producerProperties;
-
- private final String destination;
-
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- private final InstrumentationManager instrumentationManager;
-
- protected volatile boolean running = false;
-
- public RocketMQMessageHandler(String destination, RocketMQProducerProperties producerProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- InstrumentationManager instrumentationManager) {
- this.destination = destination;
- this.producerProperties = producerProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.instrumentationManager = instrumentationManager;
- }
-
- @Override
- public void start() {
- producer = new DefaultMQProducer(destination);
-
- producerInstrumentation = instrumentationManager.getProducerInstrumentation(destination);
- instrumentationManager.addHealthInstrumentation(producerInstrumentation);
-
- producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
-
- if (producerProperties.getMaxMessageSize() > 0) {
- producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
- }
-
- try {
- producer.start();
- producerInstrumentation.markStartedSuccessfully();
- } catch (MQClientException e) {
- producerInstrumentation.markStartFailed(e);
- logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
- throw new MessagingException(e.getMessage(), e);
- }
- running = true;
- }
-
- @Override
- public void stop() {
- if (producer != null) {
- producer.shutdown();
- }
- running = false;
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- @Override
- protected void handleMessageInternal(org.springframework.messaging.Message> message) throws Exception {
- try {
- Message toSend;
- if (message.getPayload() instanceof byte[]) {
- toSend = new Message(destination, (byte[])message.getPayload());
- } else if (message.getPayload() instanceof String) {
- toSend = new Message(destination, ((String)message.getPayload()).getBytes());
- } else {
- throw new UnsupportedOperationException(
- "Payload class isn't supported: " + message.getPayload().getClass());
- }
- RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
- headerAccessor.setLeaveMutable(true);
- toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
- toSend.setTags(headerAccessor.getTags());
- toSend.setKeys(headerAccessor.getKeys());
- toSend.setFlag(headerAccessor.getFlag());
- for (Map.Entry entry : headerAccessor.getUserProperties().entrySet()) {
- toSend.putUserProperty(entry.getKey(), entry.getValue());
- }
-
- SendResult sendRes = producer.send(toSend);
-
- if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
- throw new MQClientException("message hasn't been sent", null);
- }
- if (message instanceof MutableMessage) {
- RocketMQMessageHeaderAccessor.putSendResult((MutableMessage)message, sendRes);
- }
- instrumentationManager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
- Instant.now().toEpochMilli());
- producerInstrumentation.markSent();
- } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
- UnsupportedOperationException e) {
- producerInstrumentation.markSentFailure();
- logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
- throw new MessagingException(e.getMessage(), e);
- }
-
- }
+ private DefaultMQProducer producer;
+
+ private ProducerInstrumentation producerInstrumentation;
+
+ private final RocketMQProducerProperties producerProperties;
+
+ private final String destination;
+
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ private final InstrumentationManager instrumentationManager;
+
+ protected volatile boolean running = false;
+
+ public RocketMQMessageHandler(String destination,
+ RocketMQProducerProperties producerProperties,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ InstrumentationManager instrumentationManager) {
+ this.destination = destination;
+ this.producerProperties = producerProperties;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ public void start() {
+ producer = new DefaultMQProducer(destination);
+
+ producerInstrumentation = instrumentationManager
+ .getProducerInstrumentation(destination);
+ instrumentationManager.addHealthInstrumentation(producerInstrumentation);
+
+ producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
+
+ if (producerProperties.getMaxMessageSize() > 0) {
+ producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
+ }
+
+ try {
+ producer.start();
+ producerInstrumentation.markStartedSuccessfully();
+ }
+ catch (MQClientException e) {
+ producerInstrumentation.markStartFailed(e);
+ logger.error(
+ "RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
+ throw new MessagingException(e.getMessage(), e);
+ }
+ running = true;
+ }
+
+ @Override
+ public void stop() {
+ if (producer != null) {
+ producer.shutdown();
+ }
+ running = false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ protected void handleMessageInternal(org.springframework.messaging.Message> message)
+ throws Exception {
+ try {
+ Message toSend;
+ if (message.getPayload() instanceof byte[]) {
+ toSend = new Message(destination, (byte[]) message.getPayload());
+ }
+ else if (message.getPayload() instanceof String) {
+ toSend = new Message(destination,
+ ((String) message.getPayload()).getBytes());
+ }
+ else {
+ throw new UnsupportedOperationException("Payload class isn't supported: "
+ + message.getPayload().getClass());
+ }
+ RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(
+ message);
+ headerAccessor.setLeaveMutable(true);
+ toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
+ toSend.setTags(headerAccessor.getTags());
+ toSend.setKeys(headerAccessor.getKeys());
+ toSend.setFlag(headerAccessor.getFlag());
+ for (Map.Entry entry : headerAccessor.getUserProperties()
+ .entrySet()) {
+ toSend.putUserProperty(entry.getKey(), entry.getValue());
+ }
+
+ SendResult sendRes = producer.send(toSend);
+
+ if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
+ throw new MQClientException("message hasn't been sent", null);
+ }
+ if (message instanceof MutableMessage) {
+ RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
+ sendRes);
+ }
+ instrumentationManager.getRuntime().put(
+ RocketMQBinderConstants.LASTSEND_TIMESTAMP,
+ Instant.now().toEpochMilli());
+ producerInstrumentation.markSent();
+ }
+ catch (MQClientException | RemotingException | MQBrokerException
+ | InterruptedException | UnsupportedOperationException e) {
+ producerInstrumentation.markSentFailure();
+ logger.error(
+ "RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
+ throw new MessagingException(e.getMessage(), e);
+ }
+
+ }
}
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
index 5b3f9efa9..83b54c3ed 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
@@ -9,26 +9,26 @@ import com.codahale.metrics.MetricRegistry;
* @author Jim
*/
public class ConsumerGroupInstrumentation extends Instrumentation {
- private MetricRegistry metricRegistry;
+ private MetricRegistry metricRegistry;
- private AtomicBoolean delayedStart = new AtomicBoolean(false);
+ private AtomicBoolean delayedStart = new AtomicBoolean(false);
- public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
- super(name);
- this.metricRegistry = metricRegistry;
- }
+ public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
+ super(name);
+ this.metricRegistry = metricRegistry;
+ }
- public void markDelayedStart() {
- delayedStart.set(true);
- }
+ public void markDelayedStart() {
+ delayedStart.set(true);
+ }
- @Override
- public boolean isUp() {
- return started.get() || delayedStart.get();
- }
+ @Override
+ public boolean isUp() {
+ return started.get() || delayedStart.get();
+ }
- @Override
- public boolean isOutOfService() {
- return !started.get() && startException == null && !delayedStart.get();
- }
+ @Override
+ public boolean isOutOfService() {
+ return !started.get() && startException == null && !delayedStart.get();
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
index 7edc2b5f7..764307374 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
@@ -1,37 +1,40 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
+import static com.codahale.metrics.MetricRegistry.name;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import static com.codahale.metrics.MetricRegistry.name;
-
/**
* @author juven.xuxb
* @author Jim
*/
public class ConsumerInstrumentation extends Instrumentation {
- private final Counter totalConsumed;
- private final Counter totalConsumedFailures;
- private final Meter consumedPerSecond;
- private final Meter consumedFailuresPerSecond;
+ private final Counter totalConsumed;
+ private final Counter totalConsumedFailures;
+ private final Meter consumedPerSecond;
+ private final Meter consumedFailuresPerSecond;
- public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
- super(baseMetricName);
- this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
- this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
- this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
- this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
- }
+ public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
+ super(baseMetricName);
+ this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
+ this.consumedPerSecond = registry
+ .meter(name(baseMetricName, "consumedPerSecond"));
+ this.totalConsumedFailures = registry
+ .counter(name(baseMetricName, "totalConsumedFailures"));
+ this.consumedFailuresPerSecond = registry
+ .meter(name(baseMetricName, "consumedFailuresPerSecond"));
+ }
- public void markConsumed() {
- totalConsumed.inc();
- consumedPerSecond.mark();
- }
+ public void markConsumed() {
+ totalConsumed.inc();
+ consumedPerSecond.mark();
+ }
- public void markConsumedFailure() {
- totalConsumedFailures.inc();
- consumedFailuresPerSecond.mark();
- }
+ public void markConsumedFailure() {
+ totalConsumedFailures.inc();
+ consumedFailuresPerSecond.mark();
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java
index d91c15835..08ba4dd1f 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java
@@ -7,44 +7,44 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @author Jim
*/
public class Instrumentation {
- private final String name;
- protected final AtomicBoolean started = new AtomicBoolean(false);
- protected Exception startException = null;
-
- Instrumentation(String name) {
- this.name = name;
- }
-
- public boolean isDown() {
- return startException != null;
- }
-
- public boolean isUp() {
- return started.get();
- }
-
- public boolean isOutOfService() {
- return !started.get() && startException == null;
- }
-
- public void markStartedSuccessfully() {
- started.set(true);
- }
-
- public void markStartFailed(Exception e) {
- started.set(false);
- startException = e;
- }
-
- public String getName() {
- return name;
- }
-
- public boolean isStarted() {
- return started.get();
- }
-
- public Exception getStartException() {
- return startException;
- }
+ private final String name;
+ protected final AtomicBoolean started = new AtomicBoolean(false);
+ protected Exception startException = null;
+
+ Instrumentation(String name) {
+ this.name = name;
+ }
+
+ public boolean isDown() {
+ return startException != null;
+ }
+
+ public boolean isUp() {
+ return started.get();
+ }
+
+ public boolean isOutOfService() {
+ return !started.get() && startException == null;
+ }
+
+ public void markStartedSuccessfully() {
+ started.set(true);
+ }
+
+ public void markStartFailed(Exception e) {
+ started.set(false);
+ startException = e;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+ public Exception getStartException() {
+ return startException;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
index 0e5d84a36..601356f2e 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
@@ -12,46 +12,51 @@ import com.codahale.metrics.MetricRegistry;
* @author Jim
*/
public class InstrumentationManager {
- private final MetricRegistry metricRegistry;
- private final Map runtime;
- private final Map producerInstrumentations = new HashMap<>();
- private final Map consumeInstrumentations = new HashMap<>();
- private final Map consumerGroupsInstrumentations = new HashMap<>();
-
- private final Map healthInstrumentations = new HashMap<>();
-
- public InstrumentationManager(MetricRegistry metricRegistry, Map runtime) {
- this.metricRegistry = metricRegistry;
- this.runtime = runtime;
- }
-
- public ProducerInstrumentation getProducerInstrumentation(String destination) {
- String key = "scs-rocketmq.producer." + destination;
- producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key));
- return producerInstrumentations.get(key);
- }
-
- public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
- String key = "scs-rocketmq.consumer." + destination;
- consumeInstrumentations.putIfAbsent(key, new ConsumerInstrumentation(metricRegistry, key));
- return consumeInstrumentations.get(key);
- }
-
- public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
- String key = "scs-rocketmq.consumerGroup." + group;
- consumerGroupsInstrumentations.putIfAbsent(key, new ConsumerGroupInstrumentation(metricRegistry, key));
- return consumerGroupsInstrumentations.get(key);
- }
-
- public Set getHealthInstrumentations() {
- return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
- }
-
- public void addHealthInstrumentation(Instrumentation instrumentation) {
- healthInstrumentations.put(instrumentation.getName(), instrumentation);
- }
-
- public Map getRuntime() {
- return runtime;
- }
+ private final MetricRegistry metricRegistry;
+ private final Map runtime;
+ private final Map producerInstrumentations = new HashMap<>();
+ private final Map consumeInstrumentations = new HashMap<>();
+ private final Map consumerGroupsInstrumentations = new HashMap<>();
+
+ private final Map healthInstrumentations = new HashMap<>();
+
+ public InstrumentationManager(MetricRegistry metricRegistry,
+ Map runtime) {
+ this.metricRegistry = metricRegistry;
+ this.runtime = runtime;
+ }
+
+ public ProducerInstrumentation getProducerInstrumentation(String destination) {
+ String key = "scs-rocketmq.producer." + destination;
+ producerInstrumentations.putIfAbsent(key,
+ new ProducerInstrumentation(metricRegistry, key));
+ return producerInstrumentations.get(key);
+ }
+
+ public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
+ String key = "scs-rocketmq.consumer." + destination;
+ consumeInstrumentations.putIfAbsent(key,
+ new ConsumerInstrumentation(metricRegistry, key));
+ return consumeInstrumentations.get(key);
+ }
+
+ public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
+ String key = "scs-rocketmq.consumerGroup." + group;
+ consumerGroupsInstrumentations.putIfAbsent(key,
+ new ConsumerGroupInstrumentation(metricRegistry, key));
+ return consumerGroupsInstrumentations.get(key);
+ }
+
+ public Set getHealthInstrumentations() {
+ return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue)
+ .collect(Collectors.toSet());
+ }
+
+ public void addHealthInstrumentation(Instrumentation instrumentation) {
+ healthInstrumentations.put(instrumentation.getName(), instrumentation);
+ }
+
+ public Map getRuntime() {
+ return runtime;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
index 7444d9ce7..1db95ccf5 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
@@ -1,37 +1,39 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
+import static com.codahale.metrics.MetricRegistry.name;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import static com.codahale.metrics.MetricRegistry.name;
-
/**
* @author juven.xuxb
* @author Jim
*/
public class ProducerInstrumentation extends Instrumentation {
- private final Counter totalSent;
- private final Counter totalSentFailures;
- private final Meter sentPerSecond;
- private final Meter sentFailuresPerSecond;
+ private final Counter totalSent;
+ private final Counter totalSentFailures;
+ private final Meter sentPerSecond;
+ private final Meter sentFailuresPerSecond;
- public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
- super(baseMetricName);
- this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
- this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
- this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
- this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
- }
+ public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
+ super(baseMetricName);
+ this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
+ this.totalSentFailures = registry
+ .counter(name(baseMetricName, "totalSentFailures"));
+ this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
+ this.sentFailuresPerSecond = registry
+ .meter(name(baseMetricName, "sentFailuresPerSecond"));
+ }
- public void markSent() {
- totalSent.inc();
- sentPerSecond.mark();
- }
+ public void markSent() {
+ totalSent.inc();
+ sentPerSecond.mark();
+ }
- public void markSentFailure() {
- totalSentFailures.inc();
- sentFailuresPerSecond.mark();
- }
+ public void markSentFailure() {
+ totalSentFailures.inc();
+ sentFailuresPerSecond.mark();
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
index b960bc320..dd00e7d13 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
@@ -9,24 +9,24 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties {
- private String namesrvAddr;
+ private String namesrvAddr;
- private String logLevel = "ERROR";
+ private String logLevel = "ERROR";
- public String getNamesrvAddr() {
- return namesrvAddr;
- }
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
- public void setNamesrvAddr(String namesrvAddr) {
- this.namesrvAddr = namesrvAddr;
- }
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
- public String getLogLevel() {
- return logLevel;
- }
+ public String getLogLevel() {
+ return logLevel;
+ }
- public void setLogLevel(String logLevel) {
- this.logLevel = logLevel;
- }
+ public void setLogLevel(String logLevel) {
+ this.logLevel = logLevel;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java
index 4cefbd393..bd5c6d13f 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java
@@ -6,23 +6,23 @@ package org.springframework.cloud.stream.binder.rocketmq.properties;
*/
public class RocketMQBindingProperties {
- private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
+ private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
- private RocketMQProducerProperties producer = new RocketMQProducerProperties();
+ private RocketMQProducerProperties producer = new RocketMQProducerProperties();
- public RocketMQConsumerProperties getConsumer() {
- return consumer;
- }
+ public RocketMQConsumerProperties getConsumer() {
+ return consumer;
+ }
- public void setConsumer(RocketMQConsumerProperties consumer) {
- this.consumer = consumer;
- }
+ public void setConsumer(RocketMQConsumerProperties consumer) {
+ this.consumer = consumer;
+ }
- public RocketMQProducerProperties getProducer() {
- return producer;
- }
+ public RocketMQProducerProperties getProducer() {
+ return producer;
+ }
- public void setProducer(RocketMQProducerProperties producer) {
- this.producer = producer;
- }
+ public void setProducer(RocketMQProducerProperties producer) {
+ this.producer = producer;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
index bd169c795..2aa12dbef 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
@@ -19,9 +19,9 @@ public class RocketMQConsumerProperties {
private String tags;
/**
- * {@link MQPushConsumer#subscribe(String, MessageSelector)}
- * {@link MessageSelector#bySql(String)}
- */
+ * {@link MQPushConsumer#subscribe(String, MessageSelector)}
+ * {@link MessageSelector#bySql(String)}
+ */
private String sql;
/**
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
index 8f3a30ef7..b4d507e3a 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
@@ -12,53 +12,59 @@ import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
*/
@ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties implements
- ExtendedBindingProperties {
+ ExtendedBindingProperties {
- private Map bindings = new HashMap<>();
+ private Map bindings = new HashMap<>();
- public Map getBindings() {
- return this.bindings;
- }
+ public Map getBindings() {
+ return this.bindings;
+ }
- public void setBindings(Map bindings) {
- this.bindings = bindings;
- }
+ public void setBindings(Map bindings) {
+ this.bindings = bindings;
+ }
- @Override
- public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
- if (bindings.containsKey(channelName)) {
- if (bindings.get(channelName).getConsumer() != null) {
- return bindings.get(channelName).getConsumer();
- } else {
- RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
- this.bindings.get(channelName).setConsumer(properties);
- return properties;
- }
- } else {
- RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
- RocketMQBindingProperties rbp = new RocketMQBindingProperties();
- rbp.setConsumer(properties);
- bindings.put(channelName, rbp);
- return properties;
- }
- }
+ @Override
+ public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(
+ String channelName) {
+ if (bindings.containsKey(channelName)) {
+ if (bindings.get(channelName).getConsumer() != null) {
+ return bindings.get(channelName).getConsumer();
+ }
+ else {
+ RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
+ this.bindings.get(channelName).setConsumer(properties);
+ return properties;
+ }
+ }
+ else {
+ RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
+ RocketMQBindingProperties rbp = new RocketMQBindingProperties();
+ rbp.setConsumer(properties);
+ bindings.put(channelName, rbp);
+ return properties;
+ }
+ }
- @Override
- public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
- if (bindings.containsKey(channelName)) {
- if (bindings.get(channelName).getProducer() != null) {
- return bindings.get(channelName).getProducer();
- } else {
- RocketMQProducerProperties properties = new RocketMQProducerProperties();
- this.bindings.get(channelName).setProducer(properties);
- return properties;
- }
- } else {
- RocketMQProducerProperties properties = new RocketMQProducerProperties();
- RocketMQBindingProperties rbp = new RocketMQBindingProperties();
- rbp.setProducer(properties);
- bindings.put(channelName, rbp);
- return properties;
- }
- }
+ @Override
+ public synchronized RocketMQProducerProperties getExtendedProducerProperties(
+ String channelName) {
+ if (bindings.containsKey(channelName)) {
+ if (bindings.get(channelName).getProducer() != null) {
+ return bindings.get(channelName).getProducer();
+ }
+ else {
+ RocketMQProducerProperties properties = new RocketMQProducerProperties();
+ this.bindings.get(channelName).setProducer(properties);
+ return properties;
+ }
+ }
+ else {
+ RocketMQProducerProperties properties = new RocketMQProducerProperties();
+ RocketMQBindingProperties rbp = new RocketMQBindingProperties();
+ rbp.setProducer(properties);
+ bindings.put(channelName, rbp);
+ return properties;
+ }
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
index 4daed981b..828df00e9 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
@@ -8,28 +8,27 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
*/
public class RocketMQProducerProperties {
- private Boolean enabled = true;
+ private Boolean enabled = true;
- /**
- * Maximum allowed message size in bytes
- * {@link DefaultMQProducer#maxMessageSize}
- */
- private Integer maxMessageSize = 0;
+ /**
+ * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}
+ */
+ private Integer maxMessageSize = 0;
- public Boolean getEnabled() {
- return enabled;
- }
+ public Boolean getEnabled() {
+ return enabled;
+ }
- public void setEnabled(Boolean enabled) {
- this.enabled = enabled;
- }
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
- public Integer getMaxMessageSize() {
- return maxMessageSize;
- }
+ public Integer getMaxMessageSize() {
+ return maxMessageSize;
+ }
- public void setMaxMessageSize(Integer maxMessageSize) {
- this.maxMessageSize = maxMessageSize;
- }
+ public void setMaxMessageSize(Integer maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java
index b6a688efe..210209cda 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java
@@ -17,73 +17,71 @@ import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
* @author Timur Valiev
* @author Jim
*/
-public class RocketMQTopicProvisioner
- implements
- ProvisioningProvider,
- ExtendedProducerProperties> {
-
- private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
-
- @Override
- public ProducerDestination provisionProducerDestination(String name,
- ExtendedProducerProperties
- properties)
- throws ProvisioningException {
- checkTopic(name);
- return new RocketProducerDestination(name);
- }
-
- @Override
- public ConsumerDestination provisionConsumerDestination(String name, String group,
- ExtendedConsumerProperties
- properties)
- throws ProvisioningException {
- checkTopic(name);
- return new RocketConsumerDestination(name);
- }
-
- private void checkTopic(String topic) {
- try {
- Validators.checkTopic(topic);
- } catch (MQClientException e) {
- logger.error("topic check error: " + topic, e);
- throw new AssertionError(e); // Can't happen
- }
- }
-
- private static final class RocketProducerDestination implements ProducerDestination {
-
- private final String producerDestinationName;
-
- RocketProducerDestination(String destinationName) {
- this.producerDestinationName = destinationName;
- }
-
- @Override
- public String getName() {
- return producerDestinationName;
- }
-
- @Override
- public String getNameForPartition(int partition) {
- return producerDestinationName;
- }
-
- }
-
- private static final class RocketConsumerDestination implements ConsumerDestination {
-
- private final String consumerDestinationName;
-
- RocketConsumerDestination(String consumerDestinationName) {
- this.consumerDestinationName = consumerDestinationName;
- }
-
- @Override
- public String getName() {
- return this.consumerDestinationName;
- }
-
- }
+public class RocketMQTopicProvisioner implements
+ ProvisioningProvider, ExtendedProducerProperties> {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(RocketMQTopicProvisioner.class);
+
+ @Override
+ public ProducerDestination provisionProducerDestination(String name,
+ ExtendedProducerProperties properties)
+ throws ProvisioningException {
+ checkTopic(name);
+ return new RocketProducerDestination(name);
+ }
+
+ @Override
+ public ConsumerDestination provisionConsumerDestination(String name, String group,
+ ExtendedConsumerProperties properties)
+ throws ProvisioningException {
+ checkTopic(name);
+ return new RocketConsumerDestination(name);
+ }
+
+ private void checkTopic(String topic) {
+ try {
+ Validators.checkTopic(topic);
+ }
+ catch (MQClientException e) {
+ logger.error("topic check error: " + topic, e);
+ throw new AssertionError(e); // Can't happen
+ }
+ }
+
+ private static final class RocketProducerDestination implements ProducerDestination {
+
+ private final String producerDestinationName;
+
+ RocketProducerDestination(String destinationName) {
+ this.producerDestinationName = destinationName;
+ }
+
+ @Override
+ public String getName() {
+ return producerDestinationName;
+ }
+
+ @Override
+ public String getNameForPartition(int partition) {
+ return producerDestinationName;
+ }
+
+ }
+
+ private static final class RocketConsumerDestination implements ConsumerDestination {
+
+ private final String consumerDestinationName;
+
+ RocketConsumerDestination(String consumerDestinationName) {
+ this.consumerDestinationName = consumerDestinationName;
+ }
+
+ @Override
+ public String getName() {
+ return this.consumerDestinationName;
+ }
+
+ }
}