From b35e7d78773526bc32dfc011924a1b65a10d8692 Mon Sep 17 00:00:00 2001 From: zkzlx Date: Mon, 23 Aug 2021 11:41:11 +0800 Subject: [PATCH] fixed Instrumentation --- .../inbound/RocketMQInboundChannelAdapter.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java index 61c0eedd3..d27836b98 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java @@ -78,7 +78,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport || !extendedConsumerProperties.getExtension().getEnabled()) { return; } - Instrumentation instrumentation = new Instrumentation(topic, this); try { super.onInit(); if (this.retryTemplate != null) { @@ -130,18 +129,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport return ConsumeConcurrentlyStatus.RECONSUME_LATER; }, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS)); } - instrumentation.markStartedSuccessfully(); } catch (Exception e) { - instrumentation.markStartFailed(e); log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage()); throw new MessagingException(MessageBuilder.withPayload( "DefaultMQPushConsumer init failed, Caused by " + e.getMessage()) .build(), e); } - finally { - InstrumentationManager.addHealthInstrumentation(instrumentation); - } } /** @@ -188,16 +182,21 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport || !extendedConsumerProperties.getExtension().getEnabled()) { return; } + Instrumentation instrumentation = new Instrumentation(topic, this); try { pushConsumer.subscribe(topic, RocketMQUtils.getMessageSelector( extendedConsumerProperties.getExtension().getSubscription())); pushConsumer.start(); + instrumentation.markStartedSuccessfully(); } catch (Exception e) { + instrumentation.markStartFailed(e); log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage()); throw new MessagingException(MessageBuilder.withPayload( "DefaultMQPushConsumer init failed, Caused by " + e.getMessage()) .build(), e); + }finally { + InstrumentationManager.addHealthInstrumentation(instrumentation); } }