From 7d039f47425f5d32c647112d6d6a5524cbdb7f53 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 28 Nov 2018 17:13:59 +0800 Subject: [PATCH] make metrics-core as provided dependency and refactor --- pom.xml | 1 - .../rocketmq-example/pom.xml | 6 ++ spring-cloud-stream-binder-rocketmq/pom.xml | 11 ++- .../RocketMQMessageChannelBinder.java | 6 +- .../actuator/RocketMQBinderEndpoint.java | 28 +++--- .../RocketMQBinderHealthIndicator.java | 42 ++++---- .../RocketMQBinderAutoConfiguration.java | 7 +- ...cketMQBinderEndpointAutoConfiguration.java | 15 +-- .../rocketmq/consuming/ConsumersManager.java | 30 ++++-- .../RocketMQInboundChannelAdapter.java | 99 +++++++++---------- .../integration/RocketMQMessageHandler.java | 30 +++--- .../metrics/InstrumentationManager.java | 18 ++-- 12 files changed, 161 insertions(+), 132 deletions(-) diff --git a/pom.xml b/pom.xml index a2a7aa0fd..b0532ec39 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,6 @@ spring-cloud-alicloud-oss spring-cloud-alicloud-acm spring-cloud-alicloud-ans - spring-cloud-starter-bus-rocketmq diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml index bec837c36..618fb069d 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml @@ -31,6 +31,12 @@ org.springframework.boot spring-boot-starter-actuator + + + io.dropwizard.metrics + metrics-core + + diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml index 438d27d47..9449c430b 100644 --- a/spring-cloud-stream-binder-rocketmq/pom.xml +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -22,13 +22,16 @@ - io.dropwizard.metrics - metrics-core + org.apache.rocketmq + rocketmq-client - org.apache.rocketmq - rocketmq-client + io.dropwizard.metrics + metrics-core + 4.0.3 + provided + true diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index cb3d5cad4..a43b189c8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -34,7 +34,6 @@ public class RocketMQMessageChannelBinder extends .getLogger(RocketMQMessageChannelBinder.class); private final RocketMQExtendedBindingProperties extendedBindingProperties; - private final RocketMQTopicProvisioner rocketTopicProvisioner; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final InstrumentationManager instrumentationManager; private final ConsumersManager consumersManager; @@ -47,7 +46,6 @@ public class RocketMQMessageChannelBinder extends super(null, provisioningProvider); this.consumersManager = consumersManager; this.extendedBindingProperties = extendedBindingProperties; - this.rocketTopicProvisioner = provisioningProvider; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.instrumentationManager = instrumentationManager; } @@ -63,7 +61,7 @@ public class RocketMQMessageChannelBinder extends } else { throw new RuntimeException("Binding for channel " + destination.getName() - + "has been disabled, message can't be delivered"); + + " has been disabled, message can't be delivered"); } } @@ -74,7 +72,7 @@ public class RocketMQMessageChannelBinder extends throws Exception { if (group == null || "".equals(group)) { throw new RuntimeException( - "'group' must be configured for channel + " + destination.getName()); + "'group must be configured for channel + " + destination.getName()); } RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java index 5a7f57f53..5c4f9e677 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java @@ -4,12 +4,11 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.endpoint.annotation.Endpoint; import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; - -import com.codahale.metrics.MetricRegistry; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; /** * @author Timur Valiev @@ -18,23 +17,22 @@ import com.codahale.metrics.MetricRegistry; @Endpoint(id = ENDPOINT_ID) public class RocketMQBinderEndpoint { - private MetricRegistry metricRegistry = new MetricRegistry(); - private Map runtime = new ConcurrentHashMap<>(); + @Autowired(required = false) + private InstrumentationManager instrumentationManager; @ReadOperation public Map invoke() { Map result = new HashMap<>(); - result.put("metrics", metricRegistry().getMetrics()); - result.put("runtime", runtime()); + if (instrumentationManager != null) { + result.put("metrics", + instrumentationManager.getMetricRegistry().getMetrics()); + result.put("runtime", instrumentationManager.getRuntime()); + } + else { + result.put("warning", + "please add metrics-core dependency, we use it for metrics"); + } return result; } - public MetricRegistry metricRegistry() { - return metricRegistry; - } - - public Map runtime() { - return runtime; - } - } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java index 82f4ab5ff..b517a6f78 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -1,5 +1,6 @@ package org.springframework.cloud.stream.binder.rocketmq.actuator; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; @@ -11,28 +12,33 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM */ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { - private final InstrumentationManager instrumentationManager; - - public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) { - this.instrumentationManager = instrumentationManager; - } + @Autowired(required = false) + private InstrumentationManager instrumentationManager; @Override protected void doHealthCheck(Health.Builder builder) throws Exception { - if (instrumentationManager.getHealthInstrumentations().stream() - .allMatch(Instrumentation::isUp)) { - builder.up(); - return; + if (instrumentationManager != null) { + if (instrumentationManager.getHealthInstrumentations().stream() + .allMatch(Instrumentation::isUp)) { + builder.up(); + return; + } + if (instrumentationManager.getHealthInstrumentations().stream() + .allMatch(Instrumentation::isOutOfService)) { + builder.outOfService(); + return; + } + builder.down(); + instrumentationManager.getHealthInstrumentations().stream() + .filter(instrumentation -> !instrumentation.isStarted()) + .forEach(instrumentation1 -> builder + .withException(instrumentation1.getStartException())); } - if (instrumentationManager.getHealthInstrumentations().stream() - .allMatch(Instrumentation::isOutOfService)) { - builder.outOfService(); - return; + else { + builder.down(); + builder.withDetail("warning", + "please add metrics-core dependency, we use it for metrics"); } - builder.down(); - instrumentationManager.getHealthInstrumentations().stream() - .filter(instrumentation -> !instrumentation.isStarted()) - .forEach(instrumentation1 -> builder - .withException(instrumentation1.getStartException())); + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index 713edb5cf..dda8d4393 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -25,6 +25,9 @@ public class RocketMQBinderAutoConfiguration { private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + @Autowired(required = false) + private InstrumentationManager instrumentationManager; + @Autowired public RocketMQBinderAutoConfiguration( RocketMQExtendedBindingProperties extendedBindingProperties, @@ -43,7 +46,6 @@ public class RocketMQBinderAutoConfiguration { @Bean public RocketMQMessageChannelBinder rocketMessageChannelBinder( RocketMQTopicProvisioner provisioningProvider, - InstrumentationManager instrumentationManager, ConsumersManager consumersManager) { RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( consumersManager, extendedBindingProperties, provisioningProvider, @@ -52,8 +54,7 @@ public class RocketMQBinderAutoConfiguration { } @Bean - public ConsumersManager consumersManager( - InstrumentationManager instrumentationManager) { + public ConsumersManager consumersManager() { return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties); } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java index 1b0bd0f51..7e40cd5b0 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java @@ -1,7 +1,9 @@ package org.springframework.cloud.stream.binder.rocketmq.config; import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration; +import org.springframework.boot.actuate.endpoint.annotation.Endpoint; import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -13,6 +15,7 @@ import org.springframework.context.annotation.Configuration; */ @Configuration @AutoConfigureAfter(EndpointAutoConfiguration.class) +@ConditionalOnClass(Endpoint.class) public class RocketMQBinderEndpointAutoConfiguration { @Bean @@ -21,16 +24,14 @@ public class RocketMQBinderEndpointAutoConfiguration { } @Bean - public RocketMQBinderHealthIndicator rocketBinderHealthIndicator( - InstrumentationManager instrumentationManager) { - return new RocketMQBinderHealthIndicator(instrumentationManager); + public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() { + return new RocketMQBinderHealthIndicator(); } @Bean - public InstrumentationManager instrumentationManager( - RocketMQBinderEndpoint rocketBinderEndpoint) { - return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), - rocketBinderEndpoint.runtime()); + @ConditionalOnClass(name = "com.codahale.metrics.Counter") + public InstrumentationManager instrumentationManager() { + return new InstrumentationManager(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java index 4184e3f41..2dd425b03 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java @@ -3,6 +3,7 @@ package org.springframework.cloud.stream.binder.rocketmq.consuming; import java.util.AbstractMap; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -27,9 +28,10 @@ public class ConsumersManager { private final Map consumerGroups = new HashMap<>(); private final Map started = new HashMap<>(); private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>(); - private final InstrumentationManager instrumentationManager; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private InstrumentationManager instrumentationManager; + public ConsumersManager(InstrumentationManager instrumentationManager, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { this.instrumentationManager = instrumentationManager; @@ -41,9 +43,12 @@ public class ConsumersManager { ExtendedConsumerProperties consumerProperties) { propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties); - ConsumerGroupInstrumentation instrumentation = instrumentationManager - .getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(instrumentation); + + Optional.ofNullable(instrumentationManager).ifPresent(manager -> { + ConsumerGroupInstrumentation instrumentation = manager + .getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(instrumentation); + }); if (consumerGroups.containsKey(group)) { return consumerGroups.get(group); @@ -87,16 +92,23 @@ public class ConsumersManager { if (started.get(group)) { return; } - ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager - .getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(groupInstrumentation); + + ConsumerGroupInstrumentation groupInstrumentation = null; + if (Optional.ofNullable(instrumentationManager).isPresent()) { + groupInstrumentation = instrumentationManager + .getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(groupInstrumentation); + } + try { consumerGroups.get(group).start(); started.put(group, true); - groupInstrumentation.markStartedSuccessfully(); + Optional.ofNullable(groupInstrumentation) + .ifPresent(g -> g.markStartedSuccessfully()); } catch (MQClientException e) { - groupInstrumentation.markStartFailed(e); + Optional.ofNullable(groupInstrumentation) + .ifPresent(g -> g.markStartFailed(e)); logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); throw e; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 84c2898e2..1831dd7b5 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -48,20 +49,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { private ConsumerInstrumentation consumerInstrumentation; + private InstrumentationManager instrumentationManager; + + private RetryTemplate retryTemplate; + + private RecoveryCallback recoveryCallback; + private final ExtendedConsumerProperties consumerProperties; private final String destination; private final String group; - private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; - private RetryTemplate retryTemplate; - - private RecoveryCallback recoveryCallback; - public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, ExtendedConsumerProperties consumerProperties, String destination, String group, @@ -75,21 +76,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override protected void doStart() { - if (!consumerProperties.getExtension().getEnabled()) { + if (consumerProperties == null + || !consumerProperties.getExtension().getEnabled()) { return; } - String tags = consumerProperties == null ? null - : consumerProperties.getExtension().getTags(); - Boolean isOrderly = consumerProperties == null ? false - : consumerProperties.getExtension().getOrderly(); + String tags = consumerProperties.getExtension().getTags(); + Boolean isOrderly = consumerProperties.getExtension().getOrderly(); DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties); final CloudStreamMessageListener listener = isOrderly - ? new CloudStreamMessageListenerOrderly(instrumentationManager) - : new CloudStreamMessageListenerConcurrently(instrumentationManager); + ? new CloudStreamMessageListenerOrderly() + : new CloudStreamMessageListenerConcurrently(); if (retryTemplate != null) { retryTemplate.registerListener(listener); @@ -99,9 +99,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { : Arrays.stream(tags.split("\\|\\|")).map(String::trim) .collect(Collectors.toSet()); - consumerInstrumentation = instrumentationManager - .getConsumerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + Optional.ofNullable(instrumentationManager).ifPresent(manager -> { + consumerInstrumentation = manager.getConsumerInstrumentation(destination); + manager.addHealthInstrumentation(consumerInstrumentation); + }); try { if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { @@ -111,10 +112,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { else { consumer.subscribe(destination, String.join(" || ", tagsSet)); } - consumerInstrumentation.markStartedSuccessfully(); + Optional.ofNullable(consumerInstrumentation) + .ifPresent(c -> c.markStartedSuccessfully()); } catch (MQClientException e) { - consumerInstrumentation.markStartFailed(e); + Optional.ofNullable(consumerInstrumentation) + .ifPresent(c -> c.markStartFailed(e)); logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e); throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); @@ -148,12 +151,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListener implements MessageListener, RetryListener { - private final InstrumentationManager instrumentationManager; - - CloudStreamMessageListener(InstrumentationManager instrumentationManager) { - this.instrumentationManager = instrumentationManager; - } - Acknowledgement consumeMessage(final List msgs) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; try { @@ -180,10 +177,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } else { Acknowledgement result = doSendMsgs(msgs, null); - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); + Optional.ofNullable( + RocketMQInboundChannelAdapter.this.instrumentationManager) + .ifPresent(manager -> { + manager.getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + }); return result; } } @@ -191,10 +191,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { logger.error( "RocketMQ Message hasn't been processed successfully. Caused by ", e); - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); + Optional.ofNullable( + RocketMQInboundChannelAdapter.this.instrumentationManager) + .ifPresent(manager -> { + manager.getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + }); throw new RuntimeException( "RocketMQ Message hasn't been processed successfully. Caused by ", e); @@ -232,16 +235,22 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { public void close(RetryContext context, RetryCallback callback, Throwable throwable) { if (throwable != null) { - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); + Optional.ofNullable( + RocketMQInboundChannelAdapter.this.instrumentationManager) + .ifPresent(manager -> { + manager.getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + }); } else { - instrumentationManager - .getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); + Optional.ofNullable( + RocketMQInboundChannelAdapter.this.instrumentationManager) + .ifPresent(manager -> { + manager.getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + }); } } @@ -254,11 +263,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements MessageListenerConcurrently { - public CloudStreamMessageListenerConcurrently( - InstrumentationManager instrumentationManager) { - super(instrumentationManager); - } - @Override public ConsumeConcurrentlyStatus consumeMessage(final List msgs, ConsumeConcurrentlyContext context) { @@ -272,11 +276,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly { - public CloudStreamMessageListenerOrderly( - InstrumentationManager instrumentationManager) { - super(instrumentationManager); - } - @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index fb92aba70..1af261390 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -2,6 +2,7 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; import java.time.Instant; import java.util.Map; +import java.util.Optional; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -30,14 +31,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private ProducerInstrumentation producerInstrumentation; + private InstrumentationManager instrumentationManager; + private final RocketMQProducerProperties producerProperties; private final String destination; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - private final InstrumentationManager instrumentationManager; - protected volatile boolean running = false; public RocketMQMessageHandler(String destination, @@ -54,9 +55,10 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li public void start() { producer = new DefaultMQProducer(destination); - producerInstrumentation = instrumentationManager - .getProducerInstrumentation(destination); - instrumentationManager.addHealthInstrumentation(producerInstrumentation); + Optional.ofNullable(instrumentationManager).ifPresent(manager -> { + producerInstrumentation = manager.getProducerInstrumentation(destination); + manager.addHealthInstrumentation(producerInstrumentation); + }); producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); @@ -66,10 +68,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li try { producer.start(); - producerInstrumentation.markStartedSuccessfully(); + Optional.ofNullable(producerInstrumentation) + .ifPresent(p -> p.markStartedSuccessfully()); } catch (MQClientException e) { - producerInstrumentation.markStartFailed(e); + Optional.ofNullable(producerInstrumentation) + .ifPresent(p -> p.markStartFailed(e)); logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); @@ -127,14 +131,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, sendRes); } - instrumentationManager.getRuntime().put( - RocketMQBinderConstants.LASTSEND_TIMESTAMP, - Instant.now().toEpochMilli()); - producerInstrumentation.markSent(); + Optional.ofNullable(instrumentationManager).ifPresent(manager -> { + manager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP, + Instant.now().toEpochMilli()); + }); + Optional.ofNullable(producerInstrumentation).ifPresent(p -> p.markSent()); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedOperationException e) { - producerInstrumentation.markSentFailure(); + Optional.ofNullable(producerInstrumentation) + .ifPresent(p -> p.markSentFailure()); logger.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); throw new MessagingException(e.getMessage(), e); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java index 599401c32..811ba01d7 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -3,6 +3,7 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; @@ -15,22 +16,17 @@ import com.codahale.metrics.MetricRegistry; * @author Jim */ public class InstrumentationManager { - private final MetricRegistry metricRegistry; - private final Map runtime; + + private final MetricRegistry metricRegistry = new MetricRegistry(); + private final Map runtime = new ConcurrentHashMap<>(); + private final Map producerInstrumentations = new HashMap<>(); private final Map consumeInstrumentations = new HashMap<>(); private final Map consumerGroupsInstrumentations = new HashMap<>(); private final Map healthInstrumentations = new HashMap<>(); - public InstrumentationManager(MetricRegistry metricRegistry, - Map runtime) { - this.metricRegistry = metricRegistry; - this.runtime = runtime; - } - public ProducerInstrumentation getProducerInstrumentation(String destination) { - String key = Producer.PREFIX + destination; producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key)); @@ -63,4 +59,8 @@ public class InstrumentationManager { public Map getRuntime() { return runtime; } + + public MetricRegistry getMetricRegistry() { + return metricRegistry; + } }