|
|
|
@ -78,7 +78,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
|
|
|
|
|
|| !extendedConsumerProperties.getExtension().getEnabled()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Instrumentation instrumentation = new Instrumentation(topic, this);
|
|
|
|
|
try {
|
|
|
|
|
super.onInit();
|
|
|
|
|
if (this.retryTemplate != null) {
|
|
|
|
@ -130,18 +129,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
|
|
|
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
|
|
|
|
}, () -> ConsumeConcurrentlyStatus.CONSUME_SUCCESS));
|
|
|
|
|
}
|
|
|
|
|
instrumentation.markStartedSuccessfully();
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
instrumentation.markStartFailed(e);
|
|
|
|
|
log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
|
|
|
|
|
throw new MessagingException(MessageBuilder.withPayload(
|
|
|
|
|
"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
|
|
|
|
|
.build(), e);
|
|
|
|
|
}
|
|
|
|
|
finally {
|
|
|
|
|
InstrumentationManager.addHealthInstrumentation(instrumentation);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -188,16 +182,21 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
|
|
|
|
|
|| !extendedConsumerProperties.getExtension().getEnabled()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Instrumentation instrumentation = new Instrumentation(topic, this);
|
|
|
|
|
try {
|
|
|
|
|
pushConsumer.subscribe(topic, RocketMQUtils.getMessageSelector(
|
|
|
|
|
extendedConsumerProperties.getExtension().getSubscription()));
|
|
|
|
|
pushConsumer.start();
|
|
|
|
|
instrumentation.markStartedSuccessfully();
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
instrumentation.markStartFailed(e);
|
|
|
|
|
log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
|
|
|
|
|
throw new MessagingException(MessageBuilder.withPayload(
|
|
|
|
|
"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
|
|
|
|
|
.build(), e);
|
|
|
|
|
}finally {
|
|
|
|
|
InstrumentationManager.addHealthInstrumentation(instrumentation);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|