make metrics-core as provided dependency and refactor for 1.x branch

pull/129/head
fangjian0423 6 years ago
parent 71f4295170
commit 2c7ab714fb

@ -25,7 +25,6 @@
<alicloud.context.version>1.0.0</alicloud.context.version>
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
<rocketmq.version>4.3.1</rocketmq.version>
<metrics.core>3.2.6</metrics.core>
</properties>
<dependencyManagement>
@ -221,13 +220,6 @@
<version>${project.version}</version>
</dependency>
<!-- Third dependencies -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.core}</version>
</dependency>
<!-- Testing Dependencies -->

@ -21,13 +21,15 @@
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>

@ -42,76 +42,82 @@ import org.springframework.messaging.MessageHandler;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
implements
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
private static final Logger logger = LoggerFactory
.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQTopicProvisioner rocketTopicProvisioner;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQTopicProvisioner provisioningProvider,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
super(true, null, provisioningProvider);
this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties;
this.rocketTopicProvisioner = provisioningProvider;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQTopicProvisioner provisioningProvider,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
super(true, null, provisioningProvider);
this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties>
producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
rocketBinderConfigurationProperties, instrumentationManager);
} else {
throw new RuntimeException(
"Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
}
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
return new RocketMQMessageHandler(destination.getName(),
producerProperties.getExtension(),
rocketBinderConfigurationProperties, instrumentationManager);
}
else {
throw new RuntimeException("Binding for channel " + destination.getName()
+ " has been disabled, message can't be delivered");
}
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
consumerProperties)
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties)
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
"'group' must be configured for channel + " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
consumerProperties, destination.getName(), group, instrumentationManager);
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
consumersManager, consumerProperties, destination.getName(), group,
instrumentationManager);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
} else {
rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
group, consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter
.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter
.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else {
rocketInboundChannelAdapter
.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return rocketInboundChannelAdapter;
}
return rocketInboundChannelAdapter;
}
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
}
}

@ -20,11 +20,10 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import com.codahale.metrics.MetricRegistry;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev
@ -32,8 +31,8 @@ import com.codahale.metrics.MetricRegistry;
*/
public class RocketMQBinderEndpoint extends AbstractEndpoint<Map<String, Object>> {
private MetricRegistry metricRegistry = new MetricRegistry();
private Map<String, Object> runtime = new ConcurrentHashMap<>();
@Autowired(required = false)
private InstrumentationManager instrumentationManager;
public RocketMQBinderEndpoint() {
super(ENDPOINT_ID);
@ -42,17 +41,15 @@ public class RocketMQBinderEndpoint extends AbstractEndpoint<Map<String, Object>
@Override
public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>();
result.put("metrics", metricRegistry().getMetrics());
result.put("runtime", runtime());
if (instrumentationManager != null) {
result.put("metrics", instrumentationManager.getMetricRegistry());
result.put("runtime", instrumentationManager.getRuntime());
}
else {
result.put("warning",
"please add metrics-core dependency, we use it for metrics");
}
return result;
}
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public Map<String, Object> runtime() {
return runtime;
}
}

@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
@ -27,40 +28,45 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
*/
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
private final InstrumentationManager instrumentationManager;
public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
@Autowired(required = false)
private InstrumentationManager instrumentationManager;
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
int upCount = 0, outOfServiceCount = 0;
for (Instrumentation instrumentation : instrumentationManager
.getHealthInstrumentations()) {
if (instrumentation.isUp()) {
upCount++;
if (instrumentationManager != null) {
for (Instrumentation instrumentation : instrumentationManager
.getHealthInstrumentations()) {
if (instrumentation.isUp()) {
upCount++;
}
else if (instrumentation.isOutOfService()) {
upCount++;
}
}
else if (instrumentation.isOutOfService()) {
upCount++;
if (upCount == instrumentationManager.getHealthInstrumentations().size()) {
builder.up();
return;
}
}
if (upCount == instrumentationManager.getHealthInstrumentations().size()) {
builder.up();
return;
}
else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations()
.size()) {
builder.outOfService();
return;
}
builder.down();
else if (outOfServiceCount == instrumentationManager
.getHealthInstrumentations().size()) {
builder.outOfService();
return;
}
builder.down();
for (Instrumentation instrumentation : instrumentationManager
.getHealthInstrumentations()) {
if (!instrumentation.isStarted()) {
builder.withException(instrumentation.getStartException());
for (Instrumentation instrumentation : instrumentationManager
.getHealthInstrumentations()) {
if (!instrumentation.isStarted()) {
builder.withException(instrumentation.getStartException());
}
}
}
else {
builder.down();
builder.withDetail("warning",
"please add metrics-core dependency, we use it for metrics");
}
}
}

@ -33,38 +33,46 @@ import org.springframework.context.annotation.Configuration;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
RocketMQExtendedBindingProperties.class })
public class RocketMQBinderAutoConfiguration {
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired
public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
}
@Autowired(required = false)
private InstrumentationManager instrumentationManager;
@Bean
public RocketMQTopicProvisioner provisioningProvider() {
return new RocketMQTopicProvisioner();
}
@Autowired
public RocketMQBinderAutoConfiguration(
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL,
this.rocketBinderConfigurationProperties.getLogLevel());
}
@Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
InstrumentationManager instrumentationManager,
ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean
public RocketMQTopicProvisioner provisioningProvider() {
return new RocketMQTopicProvisioner();
}
@Bean
public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
}
@Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(
RocketMQTopicProvisioner provisioningProvider,
ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
consumersManager, extendedBindingProperties, provisioningProvider,
rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean
public ConsumersManager consumersManager() {
return new ConsumersManager(instrumentationManager,
rocketBinderConfigurationProperties);
}
}

@ -17,7 +17,9 @@
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration;
import org.springframework.boot.actuate.endpoint.Endpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@ -29,6 +31,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class)
@ConditionalOnClass(Endpoint.class)
public class RocketMQBinderEndpointAutoConfiguration {
@Bean
@ -37,16 +40,14 @@ public class RocketMQBinderEndpointAutoConfiguration {
}
@Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(
InstrumentationManager instrumentationManager) {
return new RocketMQBinderHealthIndicator(instrumentationManager);
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
return new RocketMQBinderHealthIndicator();
}
@Bean
public InstrumentationManager instrumentationManager(
RocketMQBinderEndpoint rocketBinderEndpoint) {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(),
rocketBinderEndpoint.runtime());
@ConditionalOnClass(name = "com.codahale.metrics.Counter")
public InstrumentationManager instrumentationManager() {
return new InstrumentationManager();
}
}

@ -38,85 +38,98 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu
*/
public class ConsumersManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap
= new HashMap<>();
private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group);
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
consumerGroups.put(group, consumer);
started.put(group, false);
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
if (consumerProperties.getExtension().getBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
logger.info("RocketMQ consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumers() throws MQClientException {
for (String group : getConsumerGroups()) {
start(group);
}
}
public synchronized void startConsumer(String group) throws MQClientException {
start(group);
}
public synchronized void stopConsumer(String group) {
stop(group);
}
private void stop(String group) {
if (consumerGroups.get(group) != null) {
consumerGroups.get(group).shutdown();
started.put(group, false);
}
}
private synchronized void start(String group) throws MQClientException {
if (started.get(group)) {
return;
}
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
try {
consumerGroups.get(group).start();
started.put(group, true);
groupInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
groupInstrumentation.markStartFailed(e);
logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() {
return consumerGroups.keySet();
}
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private InstrumentationManager instrumentationManager;
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap = new HashMap<>();
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group,
String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
consumerProperties);
if (instrumentationManager != null) {
ConsumerGroupInstrumentation instrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
}
if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group);
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
consumerGroups.put(group, consumer);
started.put(group, false);
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
if (consumerProperties.getExtension().getBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
logger.info("RocketMQ consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumers() throws MQClientException {
for (String group : getConsumerGroups()) {
start(group);
}
}
public synchronized void startConsumer(String group) throws MQClientException {
start(group);
}
public synchronized void stopConsumer(String group) {
stop(group);
}
private void stop(String group) {
if (consumerGroups.get(group) != null) {
consumerGroups.get(group).shutdown();
started.put(group, false);
}
}
private synchronized void start(String group) throws MQClientException {
if (started.get(group)) {
return;
}
ConsumerGroupInstrumentation groupInstrumentation = null;
if (instrumentationManager != null) {
groupInstrumentation = instrumentationManager
.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
}
try {
consumerGroups.get(group).start();
started.put(group, true);
if (groupInstrumentation != null) {
groupInstrumentation.markStartedSuccessfully();
}
}
catch (MQClientException e) {
if (groupInstrumentation != null) {
groupInstrumentation.markStartFailed(e);
}
logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() {
return consumerGroups.keySet();
}
}

@ -62,14 +62,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private ConsumerInstrumentation consumerInstrumentation;
private InstrumentationManager instrumentationManager;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination;
private final String group;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
private RetryTemplate retryTemplate;
@ -89,21 +89,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override
protected void doStart() {
if (!consumerProperties.getExtension().getEnabled()) {
if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return;
}
String tags = consumerProperties == null ? null
: consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties == null ? false
: consumerProperties.getExtension().getOrderly();
String tags = consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly
? new CloudStreamMessageListenerOrderly(instrumentationManager)
: new CloudStreamMessageListenerConcurrently(instrumentationManager);
? new CloudStreamMessageListenerOrderly()
: new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) {
retryTemplate.registerListener(listener);
@ -116,9 +115,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
}
consumerInstrumentation = instrumentationManager
.getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
if (instrumentationManager != null) {
consumerInstrumentation = instrumentationManager
.getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
}
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
@ -129,10 +130,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
consumer.subscribe(destination,
org.apache.commons.lang3.StringUtils.join(tagsSet, " || "));
}
consumerInstrumentation.markStartedSuccessfully();
if (consumerInstrumentation != null) {
consumerInstrumentation.markStartedSuccessfully();
}
}
catch (MQClientException e) {
consumerInstrumentation.markStartFailed(e);
if (consumerInstrumentation != null) {
consumerInstrumentation.markStartFailed(e);
}
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
+ e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
@ -166,19 +171,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
private final InstrumentationManager instrumentationManager;
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
try {
if (enableRetry) {
return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
// (RetryCallback<Acknowledgement, Exception>)
new RetryCallback<Acknowledgement, Exception>() {
return RocketMQInboundChannelAdapter.this.retryTemplate
.execute(new RetryCallback<Acknowledgement, Exception>() {
@Override
public Acknowledgement doWithRetry(RetryContext context)
throws Exception {
@ -203,10 +201,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
else {
Acknowledgement result = doSendMsgs(msgs, null);
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) {
RocketMQInboundChannelAdapter.this.instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
}
return result;
}
}
@ -214,10 +214,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
logger.error(
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
if (RocketMQInboundChannelAdapter.this.instrumentationManager != null) {
RocketMQInboundChannelAdapter.this.instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
}
throw new RuntimeException(
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
@ -254,14 +256,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
if (RocketMQInboundChannelAdapter.this.instrumentationManager == null) {
return;
}
if (throwable != null) {
instrumentationManager
RocketMQInboundChannelAdapter.this.instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
}
else {
instrumentationManager
RocketMQInboundChannelAdapter.this.instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
@ -277,11 +282,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerConcurrently
extends CloudStreamMessageListener implements MessageListenerConcurrently {
public CloudStreamMessageListenerConcurrently(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
@ -295,11 +295,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
implements MessageListenerOrderly {
public CloudStreamMessageListenerOrderly(
InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {

@ -45,14 +45,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private ProducerInstrumentation producerInstrumentation;
private InstrumentationManager instrumentationManager;
private final RocketMQProducerProperties producerProperties;
private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
protected volatile boolean running = false;
public RocketMQMessageHandler(String destination,
@ -69,9 +69,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void start() {
producer = new DefaultMQProducer(destination);
producerInstrumentation = instrumentationManager
.getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
if (instrumentationManager != null) {
producerInstrumentation = instrumentationManager
.getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
}
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
@ -81,10 +83,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
try {
producer.start();
producerInstrumentation.markStartedSuccessfully();
if (producerInstrumentation != null) {
producerInstrumentation.markStartedSuccessfully();
}
}
catch (MQClientException e) {
producerInstrumentation.markStartFailed(e);
if (producerInstrumentation != null) {
producerInstrumentation.markStartFailed(e);
}
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
@ -142,14 +148,18 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
sendRes);
}
instrumentationManager.getRuntime().put(
RocketMQBinderConstants.LASTSEND_TIMESTAMP,
System.currentTimeMillis());
producerInstrumentation.markSent();
if (instrumentationManager != null) {
instrumentationManager.getRuntime().put(
RocketMQBinderConstants.LASTSEND_TIMESTAMP,
System.currentTimeMillis());
producerInstrumentation.markSent();
}
}
catch (MQClientException | RemotingException | MQBrokerException
| InterruptedException | UnsupportedOperationException e) {
producerInstrumentation.markSentFailure();
if (producerInstrumentation != null) {
producerInstrumentation.markSentFailure();
}
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);

@ -28,20 +28,14 @@ import com.codahale.metrics.MetricRegistry;
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class InstrumentationManager {
private final MetricRegistry metricRegistry;
private final Map<String, Object> runtime;
private final MetricRegistry metricRegistry = new MetricRegistry();
private final Map<String, Object> runtime = new HashMap<>();
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public InstrumentationManager(MetricRegistry metricRegistry,
Map<String, Object> runtime) {
this.metricRegistry = metricRegistry;
this.runtime = runtime;
}
public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = "scs-rocketmq.producer." + destination;
ProducerInstrumentation producerInstrumentation = producerInstrumentations
@ -86,4 +80,8 @@ public class InstrumentationManager {
public Map<String, Object> getRuntime() {
return runtime;
}
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
}

Loading…
Cancel
Save