diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index ce25d7221..3cba7a73e 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -25,7 +25,6 @@ 1.0.0 2.16.0 4.3.1 - 3.2.6 @@ -221,13 +220,6 @@ ${project.version} - - - io.dropwizard.metrics - metrics-core - ${metrics.core} - - diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml index 4b534d4b8..7bf086c4f 100644 --- a/spring-cloud-stream-binder-rocketmq/pom.xml +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -21,13 +21,15 @@ - io.dropwizard.metrics - metrics-core + org.apache.rocketmq + rocketmq-client - org.apache.rocketmq - rocketmq-client + io.dropwizard.metrics + metrics-core + provided + true diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 4bed86afc..03009b436 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -42,76 +42,82 @@ import org.springframework.messaging.MessageHandler; * @author Jim */ public class RocketMQMessageChannelBinder extends - AbstractMessageChannelBinder, - ExtendedProducerProperties, RocketMQTopicProvisioner> - implements ExtendedPropertiesBinder { + AbstractMessageChannelBinder, ExtendedProducerProperties, RocketMQTopicProvisioner> + implements + ExtendedPropertiesBinder { - private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class); + private static final Logger logger = LoggerFactory + .getLogger(RocketMQMessageChannelBinder.class); - private final RocketMQExtendedBindingProperties extendedBindingProperties; - private final RocketMQTopicProvisioner rocketTopicProvisioner; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; + private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final InstrumentationManager instrumentationManager; + private final ConsumersManager consumersManager; - public RocketMQMessageChannelBinder(ConsumersManager consumersManager, - RocketMQExtendedBindingProperties extendedBindingProperties, - RocketMQTopicProvisioner provisioningProvider, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, - InstrumentationManager instrumentationManager) { - super(true, null, provisioningProvider); - this.consumersManager = consumersManager; - this.extendedBindingProperties = extendedBindingProperties; - this.rocketTopicProvisioner = provisioningProvider; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - this.instrumentationManager = instrumentationManager; - } + public RocketMQMessageChannelBinder(ConsumersManager consumersManager, + RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQTopicProvisioner provisioningProvider, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + InstrumentationManager instrumentationManager) { + super(true, null, provisioningProvider); + this.consumersManager = consumersManager; + this.extendedBindingProperties = extendedBindingProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.instrumentationManager = instrumentationManager; + } - @Override - protected MessageHandler createProducerMessageHandler(ProducerDestination destination, - ExtendedProducerProperties - producerProperties, - MessageChannel errorChannel) throws Exception { - if (producerProperties.getExtension().getEnabled()) { - return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(), - rocketBinderConfigurationProperties, instrumentationManager); - } else { - throw new RuntimeException( - "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered"); - } - } + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties producerProperties, + MessageChannel errorChannel) throws Exception { + if (producerProperties.getExtension().getEnabled()) { + return new RocketMQMessageHandler(destination.getName(), + producerProperties.getExtension(), + rocketBinderConfigurationProperties, instrumentationManager); + } + else { + throw new RuntimeException("Binding for channel " + destination.getName() + + " has been disabled, message can't be delivered"); + } + } - @Override - protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, - ExtendedConsumerProperties - consumerProperties) - throws Exception { - if (group == null || "".equals(group)) { - throw new RuntimeException("'group' must be configured for channel + " + destination.getName()); - } + @Override + protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, + String group, + ExtendedConsumerProperties consumerProperties) + throws Exception { + if (group == null || "".equals(group)) { + throw new RuntimeException( + "'group' must be configured for channel + " + destination.getName()); + } - RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager, - consumerProperties, destination.getName(), group, instrumentationManager); + RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( + consumersManager, consumerProperties, destination.getName(), group, + instrumentationManager); - ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, - consumerProperties); - if (consumerProperties.getMaxAttempts() > 1) { - rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties)); - rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); - } else { - rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel()); - } + ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, + group, consumerProperties); + if (consumerProperties.getMaxAttempts() > 1) { + rocketInboundChannelAdapter + .setRetryTemplate(buildRetryTemplate(consumerProperties)); + rocketInboundChannelAdapter + .setRecoveryCallback(errorInfrastructure.getRecoverer()); + } + else { + rocketInboundChannelAdapter + .setErrorChannel(errorInfrastructure.getErrorChannel()); + } - return rocketInboundChannelAdapter; - } + return rocketInboundChannelAdapter; + } - @Override - public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { - return extendedBindingProperties.getExtendedConsumerProperties(channelName); - } + @Override + public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { + return extendedBindingProperties.getExtendedConsumerProperties(channelName); + } - @Override - public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { - return extendedBindingProperties.getExtendedProducerProperties(channelName); - } + @Override + public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { + return extendedBindingProperties.getExtendedProducerProperties(channelName); + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java index 756924beb..baf8e7004 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java @@ -20,11 +20,10 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.endpoint.AbstractEndpoint; - -import com.codahale.metrics.MetricRegistry; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; /** * @author Timur Valiev @@ -32,8 +31,8 @@ import com.codahale.metrics.MetricRegistry; */ public class RocketMQBinderEndpoint extends AbstractEndpoint> { - private MetricRegistry metricRegistry = new MetricRegistry(); - private Map runtime = new ConcurrentHashMap<>(); + @Autowired(required = false) + private InstrumentationManager instrumentationManager; public RocketMQBinderEndpoint() { super(ENDPOINT_ID); @@ -42,17 +41,15 @@ public class RocketMQBinderEndpoint extends AbstractEndpoint @Override public Map invoke() { Map result = new HashMap<>(); - result.put("metrics", metricRegistry().getMetrics()); - result.put("runtime", runtime()); + if (instrumentationManager != null) { + result.put("metrics", instrumentationManager.getMetricRegistry()); + result.put("runtime", instrumentationManager.getRuntime()); + } + else { + result.put("warning", + "please add metrics-core dependency, we use it for metrics"); + } return result; } - public MetricRegistry metricRegistry() { - return metricRegistry; - } - - public Map runtime() { - return runtime; - } - } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java index f23463acf..cb2c2efea 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.rocketmq.actuator; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; @@ -27,40 +28,45 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM */ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { - private final InstrumentationManager instrumentationManager; - - public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) { - this.instrumentationManager = instrumentationManager; - } + @Autowired(required = false) + private InstrumentationManager instrumentationManager; @Override protected void doHealthCheck(Health.Builder builder) throws Exception { int upCount = 0, outOfServiceCount = 0; - for (Instrumentation instrumentation : instrumentationManager - .getHealthInstrumentations()) { - if (instrumentation.isUp()) { - upCount++; + if (instrumentationManager != null) { + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (instrumentation.isUp()) { + upCount++; + } + else if (instrumentation.isOutOfService()) { + upCount++; + } } - else if (instrumentation.isOutOfService()) { - upCount++; + if (upCount == instrumentationManager.getHealthInstrumentations().size()) { + builder.up(); + return; } - } - if (upCount == instrumentationManager.getHealthInstrumentations().size()) { - builder.up(); - return; - } - else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations() - .size()) { - builder.outOfService(); - return; - } - builder.down(); + else if (outOfServiceCount == instrumentationManager + .getHealthInstrumentations().size()) { + builder.outOfService(); + return; + } + builder.down(); - for (Instrumentation instrumentation : instrumentationManager - .getHealthInstrumentations()) { - if (!instrumentation.isStarted()) { - builder.withException(instrumentation.getStartException()); + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (!instrumentation.isStarted()) { + builder.withException(instrumentation.getStartException()); + } } } + else { + builder.down(); + builder.withDetail("warning", + "please add metrics-core dependency, we use it for metrics"); + } + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index f2bb68cff..95abaeeb5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -33,38 +33,46 @@ import org.springframework.context.annotation.Configuration; * @author Jim */ @Configuration -@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class}) +@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, + RocketMQExtendedBindingProperties.class }) public class RocketMQBinderAutoConfiguration { - private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQExtendedBindingProperties extendedBindingProperties; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - @Autowired - public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { - this.extendedBindingProperties = extendedBindingProperties; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel()); - } + @Autowired(required = false) + private InstrumentationManager instrumentationManager; - @Bean - public RocketMQTopicProvisioner provisioningProvider() { - return new RocketMQTopicProvisioner(); - } + @Autowired + public RocketMQBinderAutoConfiguration( + RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.extendedBindingProperties = extendedBindingProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, + this.rocketBinderConfigurationProperties.getLogLevel()); + } - @Bean - public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, - InstrumentationManager instrumentationManager, - ConsumersManager consumersManager) { - RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties, - provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager); - return binder; - } + @Bean + public RocketMQTopicProvisioner provisioningProvider() { + return new RocketMQTopicProvisioner(); + } - @Bean - public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) { - return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties); - } + @Bean + public RocketMQMessageChannelBinder rocketMessageChannelBinder( + RocketMQTopicProvisioner provisioningProvider, + ConsumersManager consumersManager) { + RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( + consumersManager, extendedBindingProperties, provisioningProvider, + rocketBinderConfigurationProperties, instrumentationManager); + return binder; + } + + @Bean + public ConsumersManager consumersManager() { + return new ConsumersManager(instrumentationManager, + rocketBinderConfigurationProperties); + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java index a53865096..4b6026211 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java @@ -17,7 +17,9 @@ package org.springframework.cloud.stream.binder.rocketmq.config; import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration; +import org.springframework.boot.actuate.endpoint.Endpoint; import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -29,6 +31,7 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @AutoConfigureAfter(EndpointAutoConfiguration.class) +@ConditionalOnClass(Endpoint.class) public class RocketMQBinderEndpointAutoConfiguration { @Bean @@ -37,16 +40,14 @@ public class RocketMQBinderEndpointAutoConfiguration { } @Bean - public RocketMQBinderHealthIndicator rocketBinderHealthIndicator( - InstrumentationManager instrumentationManager) { - return new RocketMQBinderHealthIndicator(instrumentationManager); + public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() { + return new RocketMQBinderHealthIndicator(); } @Bean - public InstrumentationManager instrumentationManager( - RocketMQBinderEndpoint rocketBinderEndpoint) { - return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), - rocketBinderEndpoint.runtime()); + @ConditionalOnClass(name = "com.codahale.metrics.Counter") + public InstrumentationManager instrumentationManager() { + return new InstrumentationManager(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java index 6b9445f93..c56f060c1 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java @@ -38,85 +38,98 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu */ public class ConsumersManager { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final Map consumerGroups = new HashMap<>(); - private final Map started = new HashMap<>(); - private final Map, ExtendedConsumerProperties> propertiesMap - = new HashMap<>(); - private final InstrumentationManager instrumentationManager; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - - public ConsumersManager(InstrumentationManager instrumentationManager, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { - this.instrumentationManager = instrumentationManager; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - } - - public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic, - ExtendedConsumerProperties consumerProperties) { - propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties); - ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(instrumentation); - - if (consumerGroups.containsKey(group)) { - return consumerGroups.get(group); - } - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); - consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - consumerGroups.put(group, consumer); - started.put(group, false); - consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); - consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); - if (consumerProperties.getExtension().getBroadcasting()) { - consumer.setMessageModel(MessageModel.BROADCASTING); - } - logger.info("RocketMQ consuming for SCS group {} created", group); - return consumer; - } - - public synchronized void startConsumers() throws MQClientException { - for (String group : getConsumerGroups()) { - start(group); - } - } - - public synchronized void startConsumer(String group) throws MQClientException { - start(group); - } - - public synchronized void stopConsumer(String group) { - stop(group); - } - - private void stop(String group) { - if (consumerGroups.get(group) != null) { - consumerGroups.get(group).shutdown(); - started.put(group, false); - } - } - - private synchronized void start(String group) throws MQClientException { - if (started.get(group)) { - return; - } - ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation( - group); - instrumentationManager.addHealthInstrumentation(groupInstrumentation); - try { - consumerGroups.get(group).start(); - started.put(group, true); - groupInstrumentation.markStartedSuccessfully(); - } catch (MQClientException e) { - groupInstrumentation.markStartFailed(e); - logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); - throw e; - } - } - - public synchronized Set getConsumerGroups() { - return consumerGroups.keySet(); - } + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private InstrumentationManager instrumentationManager; + + private final Map consumerGroups = new HashMap<>(); + private final Map started = new HashMap<>(); + private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>(); + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + public ConsumersManager(InstrumentationManager instrumentationManager, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.instrumentationManager = instrumentationManager; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + } + + public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, + String topic, + ExtendedConsumerProperties consumerProperties) { + propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), + consumerProperties); + if (instrumentationManager != null) { + ConsumerGroupInstrumentation instrumentation = instrumentationManager + .getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(instrumentation); + } + + if (consumerGroups.containsKey(group)) { + return consumerGroups.get(group); + } + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); + consumerGroups.put(group, consumer); + started.put(group, false); + consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); + consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); + if (consumerProperties.getExtension().getBroadcasting()) { + consumer.setMessageModel(MessageModel.BROADCASTING); + } + logger.info("RocketMQ consuming for SCS group {} created", group); + return consumer; + } + + public synchronized void startConsumers() throws MQClientException { + for (String group : getConsumerGroups()) { + start(group); + } + } + + public synchronized void startConsumer(String group) throws MQClientException { + start(group); + } + + public synchronized void stopConsumer(String group) { + stop(group); + } + + private void stop(String group) { + if (consumerGroups.get(group) != null) { + consumerGroups.get(group).shutdown(); + started.put(group, false); + } + } + + private synchronized void start(String group) throws MQClientException { + if (started.get(group)) { + return; + } + ConsumerGroupInstrumentation groupInstrumentation = null; + if (instrumentationManager != null) { + groupInstrumentation = instrumentationManager + .getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(groupInstrumentation); + } + try { + consumerGroups.get(group).start(); + started.put(group, true); + if (groupInstrumentation != null) { + groupInstrumentation.markStartedSuccessfully(); + } + } + catch (MQClientException e) { + if (groupInstrumentation != null) { + groupInstrumentation.markStartFailed(e); + } + logger.error("RocketMQ Consumer hasn't been started. Caused by " + + e.getErrorMessage(), e); + throw e; + } + } + + public synchronized Set getConsumerGroups() { + return consumerGroups.keySet(); + } } - diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 8c884da84..4558eb950 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -62,14 +62,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { private ConsumerInstrumentation consumerInstrumentation; + private InstrumentationManager instrumentationManager; + private final ExtendedConsumerProperties consumerProperties; private final String destination; private final String group; - private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; private RetryTemplate retryTemplate; @@ -89,21 +89,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override protected void doStart() { - if (!consumerProperties.getExtension().getEnabled()) { + if (consumerProperties == null + || !consumerProperties.getExtension().getEnabled()) { return; } - String tags = consumerProperties == null ? null - : consumerProperties.getExtension().getTags(); - Boolean isOrderly = consumerProperties == null ? false - : consumerProperties.getExtension().getOrderly(); + String tags = consumerProperties.getExtension().getTags(); + Boolean isOrderly = consumerProperties.getExtension().getOrderly(); DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties); final CloudStreamMessageListener listener = isOrderly - ? new CloudStreamMessageListenerOrderly(instrumentationManager) - : new CloudStreamMessageListenerConcurrently(instrumentationManager); + ? new CloudStreamMessageListenerOrderly() + : new CloudStreamMessageListenerConcurrently(); if (retryTemplate != null) { retryTemplate.registerListener(listener); @@ -116,9 +115,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } } - consumerInstrumentation = instrumentationManager - .getConsumerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + if (instrumentationManager != null) { + consumerInstrumentation = instrumentationManager + .getConsumerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + } try { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { @@ -129,10 +130,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { consumer.subscribe(destination, org.apache.commons.lang3.StringUtils.join(tagsSet, " || ")); } - consumerInstrumentation.markStartedSuccessfully(); + if (consumerInstrumentation != null) { + consumerInstrumentation.markStartedSuccessfully(); + } } catch (MQClientException e) { - consumerInstrumentation.markStartFailed(e); + if (consumerInstrumentation != null) { + consumerInstrumentation.markStartFailed(e); + } logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e); throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); @@ -166,19 +171,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListener implements MessageListener, RetryListener { - private final InstrumentationManager instrumentationManager; - - CloudStreamMessageListener(InstrumentationManager instrumentationManager) { - this.instrumentationManager = instrumentationManager; - } - Acknowledgement consumeMessage(final List msgs) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; try { if (enableRetry) { - return RocketMQInboundChannelAdapter.this.retryTemplate.execute( - // (RetryCallback) - new RetryCallback() { + return RocketMQInboundChannelAdapter.this.retryTemplate + .execute(new RetryCallback() { @Override public Acknowledgement doWithRetry(RetryContext context) throws Exception { @@ -203,10 +201,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } else { Acknowledgement result = doSendMsgs(msgs, null); - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); + if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) { + RocketMQInboundChannelAdapter.this.instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + } return result; } } @@ -214,10 +214,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { logger.error( "RocketMQ Message hasn't been processed successfully. Caused by ", e); - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); + if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) { + RocketMQInboundChannelAdapter.this.instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + } throw new RuntimeException( "RocketMQ Message hasn't been processed successfully. Caused by ", e); @@ -254,14 +256,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override public void close(RetryContext context, RetryCallback callback, Throwable throwable) { + if (RocketMQInboundChannelAdapter.this.instrumentationManager == null) { + return; + } if (throwable != null) { - instrumentationManager + RocketMQInboundChannelAdapter.this.instrumentationManager .getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.destination) .markConsumedFailure(); } else { - instrumentationManager + RocketMQInboundChannelAdapter.this.instrumentationManager .getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.destination) .markConsumed(); @@ -277,11 +282,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements MessageListenerConcurrently { - public CloudStreamMessageListenerConcurrently( - InstrumentationManager instrumentationManager) { - super(instrumentationManager); - } - @Override public ConsumeConcurrentlyStatus consumeMessage(final List msgs, ConsumeConcurrentlyContext context) { @@ -295,11 +295,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly { - public CloudStreamMessageListenerOrderly( - InstrumentationManager instrumentationManager) { - super(instrumentationManager); - } - @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 88273e8c1..0ab7386c1 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -45,14 +45,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private ProducerInstrumentation producerInstrumentation; + private InstrumentationManager instrumentationManager; + private final RocketMQProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - private final InstrumentationManager instrumentationManager; - protected volatile boolean running = false; public RocketMQMessageHandler(String destination, @@ -69,9 +69,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li public void start() { producer = new DefaultMQProducer(destination); - producerInstrumentation = instrumentationManager - .getProducerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(producerInstrumentation); + if (instrumentationManager != null) { + producerInstrumentation = instrumentationManager + .getProducerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(producerInstrumentation); + } producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); @@ -81,10 +83,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li try { producer.start(); - producerInstrumentation.markStartedSuccessfully(); + if (producerInstrumentation != null) { + producerInstrumentation.markStartedSuccessfully(); + } } catch (MQClientException e) { - producerInstrumentation.markStartFailed(e); + if (producerInstrumentation != null) { + producerInstrumentation.markStartFailed(e); + } logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); @@ -142,14 +148,18 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, sendRes); } - instrumentationManager.getRuntime().put( - RocketMQBinderConstants.LASTSEND_TIMESTAMP, - System.currentTimeMillis()); - producerInstrumentation.markSent(); + if (instrumentationManager != null) { + instrumentationManager.getRuntime().put( + RocketMQBinderConstants.LASTSEND_TIMESTAMP, + System.currentTimeMillis()); + producerInstrumentation.markSent(); + } } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedOperationException e) { - producerInstrumentation.markSentFailure(); + if (producerInstrumentation != null) { + producerInstrumentation.markSentFailure(); + } logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java index dea848590..8d047e858 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -28,20 +28,14 @@ import com.codahale.metrics.MetricRegistry; * @author Jim */ public class InstrumentationManager { - private final MetricRegistry metricRegistry; - private final Map runtime; + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final Map runtime = new HashMap<>(); private final Map producerInstrumentations = new HashMap<>(); private final Map consumeInstrumentations = new HashMap<>(); private final Map consumerGroupsInstrumentations = new HashMap<>(); private final Map healthInstrumentations = new HashMap<>(); - public InstrumentationManager(MetricRegistry metricRegistry, - Map runtime) { - this.metricRegistry = metricRegistry; - this.runtime = runtime; - } - public ProducerInstrumentation getProducerInstrumentation(String destination) { String key = "scs-rocketmq.producer." + destination; ProducerInstrumentation producerInstrumentation = producerInstrumentations @@ -86,4 +80,8 @@ public class InstrumentationManager { public Map getRuntime() { return runtime; } + + public MetricRegistry getMetricRegistry() { + return metricRegistry; + } }