diff --git a/pom.xml b/pom.xml
index a2a7aa0fd..b0532ec39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,7 +100,6 @@
spring-cloud-alicloud-oss
spring-cloud-alicloud-acm
spring-cloud-alicloud-ans
- spring-cloud-starter-bus-rocketmq
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml
index bec837c36..618fb069d 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/pom.xml
@@ -31,6 +31,12 @@
org.springframework.boot
spring-boot-starter-actuator
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
index f8f4e4eeb..bd724a530 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
+++ b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
@@ -122,12 +122,12 @@ spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic
-spring.cloud.stream.bindings.input1.content-type=application/json
+spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input2.destination=test-topic
-spring.cloud.stream.bindings.input2.content-type=application/json
+spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
@@ -195,7 +195,7 @@ Spring Boot 应用支持通过 Endpoint 来暴露相关信息,RocketMQ Stream
* Spring Boot 1.x 中添加配置 `management.security.enabled=false`
* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`
-Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq-binder 来查看 RocketMQ Binder Endpoint 的信息。Spring Boot 2.x 可以通过访问 http://127.0.0.1:28081/acutator/rocketmq-binder 来访问。
+Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq_binder 来查看 RocketMQ Binder Endpoint 的信息。Spring Boot 2.x 可以通过访问 http://127.0.0.1:28081/actuator/rocketmq-binder 来访问。
这里会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。
@@ -249,6 +249,14 @@ Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq-binder 来查
}
```
+注意:要想查看统计数据需要在pom里加上 [metrics-core依赖](https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core) 。如若不加,endpoint将会显示warning信息而不会显示统计信息:
+
+```json
+{
+ "warning": "please add metrics-core dependency, we use it to metrics"
+}
+```
+
## More
RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java
index 652400a2a..e98b6a10a 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java
@@ -6,14 +6,14 @@ package org.springframework.cloud.alibaba.cloud.examples;
public class Foo {
private int id;
- private String tag;
+ private String bar;
public Foo() {
}
- public Foo(int id, String tag) {
+ public Foo(int id, String bar) {
this.id = id;
- this.tag = tag;
+ this.bar = bar;
}
public int getId() {
@@ -24,16 +24,16 @@ public class Foo {
this.id = id;
}
- public String getTag() {
- return tag;
- }
+ public String getBar() {
+ return bar;
+ }
- public void setTag(String tag) {
- this.tag = tag;
- }
+ public void setBar(String bar) {
+ this.bar = bar;
+ }
- @Override
+ @Override
public String toString() {
- return "Foo{" + "id=" + id + ", tag='" + tag + '\'' + '}';
+ return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}';
}
}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties
index e936d25cc..c92dd24a3 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties
+++ b/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties
@@ -6,13 +6,13 @@ spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic
-spring.cloud.stream.bindings.input1.content-type=application/json
+spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input1.consumer.maxAttempts=1
spring.cloud.stream.bindings.input2.destination=test-topic
-spring.cloud.stream.bindings.input2.content-type=application/json
+spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml
index 438d27d47..9449c430b 100644
--- a/spring-cloud-stream-binder-rocketmq/pom.xml
+++ b/spring-cloud-stream-binder-rocketmq/pom.xml
@@ -22,13 +22,16 @@
- io.dropwizard.metrics
- metrics-core
+ org.apache.rocketmq
+ rocketmq-client
- org.apache.rocketmq
- rocketmq-client
+ io.dropwizard.metrics
+ metrics-core
+ 4.0.3
+ provided
+ true
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
index 3ee0335c2..cd619acf4 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -5,6 +5,8 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/
public interface RocketMQBinderConstants {
+ String ENDPOINT_ID = "rocketmq-binder";
+
/**
* Header key
*/
@@ -17,10 +19,27 @@ public interface RocketMQBinderConstants {
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**
- * Instrumentation key
+ * Instrumentation
*/
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
- String ENDPOINT_ID = "rocketmq-binder";
+ interface Metrics {
+ interface Producer {
+ String PREFIX = "scs-rocketmq.producer.";
+ String TOTAL_SENT = "totalSent";
+ String TOTAL_SENT_FAILURES = "totalSentFailures";
+ String SENT_PER_SECOND = "sentPerSecond";
+ String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond";
+ }
+
+ interface Consumer {
+ String GROUP_PREFIX = "scs-rocketmq.consumerGroup.";
+ String PREFIX = "scs-rocketmq.consumer.";
+ String TOTAL_CONSUMED = "totalConsumed";
+ String CONSUMED_PER_SECOND = "consumedPerSecond";
+ String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures";
+ String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond";
+ }
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index cb3d5cad4..a43b189c8 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -34,7 +34,6 @@ public class RocketMQMessageChannelBinder extends
.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties;
- private final RocketMQTopicProvisioner rocketTopicProvisioner;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
@@ -47,7 +46,6 @@ public class RocketMQMessageChannelBinder extends
super(null, provisioningProvider);
this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties;
- this.rocketTopicProvisioner = provisioningProvider;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
@@ -63,7 +61,7 @@ public class RocketMQMessageChannelBinder extends
}
else {
throw new RuntimeException("Binding for channel " + destination.getName()
- + "has been disabled, message can't be delivered");
+ + " has been disabled, message can't be delivered");
}
}
@@ -74,7 +72,7 @@ public class RocketMQMessageChannelBinder extends
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
- "'group' must be configured for channel + " + destination.getName());
+ "'group must be configured for channel + " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
index 5a7f57f53..5c4f9e677 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
@@ -4,12 +4,11 @@ import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderCon
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
-
-import com.codahale.metrics.MetricRegistry;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev
@@ -18,23 +17,22 @@ import com.codahale.metrics.MetricRegistry;
@Endpoint(id = ENDPOINT_ID)
public class RocketMQBinderEndpoint {
- private MetricRegistry metricRegistry = new MetricRegistry();
- private Map runtime = new ConcurrentHashMap<>();
+ @Autowired(required = false)
+ private InstrumentationManager instrumentationManager;
@ReadOperation
public Map invoke() {
Map result = new HashMap<>();
- result.put("metrics", metricRegistry().getMetrics());
- result.put("runtime", runtime());
+ if (instrumentationManager != null) {
+ result.put("metrics",
+ instrumentationManager.getMetricRegistry().getMetrics());
+ result.put("runtime", instrumentationManager.getRuntime());
+ }
+ else {
+ result.put("warning",
+ "please add metrics-core dependency, we use it for metrics");
+ }
return result;
}
- public MetricRegistry metricRegistry() {
- return metricRegistry;
- }
-
- public Map runtime() {
- return runtime;
- }
-
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
index 82f4ab5ff..b517a6f78 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
@@ -1,5 +1,6 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
@@ -11,28 +12,33 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM
*/
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
- private final InstrumentationManager instrumentationManager;
-
- public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
- this.instrumentationManager = instrumentationManager;
- }
+ @Autowired(required = false)
+ private InstrumentationManager instrumentationManager;
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
- if (instrumentationManager.getHealthInstrumentations().stream()
- .allMatch(Instrumentation::isUp)) {
- builder.up();
- return;
+ if (instrumentationManager != null) {
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isUp)) {
+ builder.up();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream()
+ .allMatch(Instrumentation::isOutOfService)) {
+ builder.outOfService();
+ return;
+ }
+ builder.down();
+ instrumentationManager.getHealthInstrumentations().stream()
+ .filter(instrumentation -> !instrumentation.isStarted())
+ .forEach(instrumentation1 -> builder
+ .withException(instrumentation1.getStartException()));
}
- if (instrumentationManager.getHealthInstrumentations().stream()
- .allMatch(Instrumentation::isOutOfService)) {
- builder.outOfService();
- return;
+ else {
+ builder.down();
+ builder.withDetail("warning",
+ "please add metrics-core dependency, we use it for metrics");
}
- builder.down();
- instrumentationManager.getHealthInstrumentations().stream()
- .filter(instrumentation -> !instrumentation.isStarted())
- .forEach(instrumentation1 -> builder
- .withException(instrumentation1.getStartException()));
+
}
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
index 713edb5cf..dda8d4393 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
@@ -25,6 +25,9 @@ public class RocketMQBinderAutoConfiguration {
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+ @Autowired(required = false)
+ private InstrumentationManager instrumentationManager;
+
@Autowired
public RocketMQBinderAutoConfiguration(
RocketMQExtendedBindingProperties extendedBindingProperties,
@@ -43,7 +46,6 @@ public class RocketMQBinderAutoConfiguration {
@Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(
RocketMQTopicProvisioner provisioningProvider,
- InstrumentationManager instrumentationManager,
ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
consumersManager, extendedBindingProperties, provisioningProvider,
@@ -52,8 +54,7 @@ public class RocketMQBinderAutoConfiguration {
}
@Bean
- public ConsumersManager consumersManager(
- InstrumentationManager instrumentationManager) {
+ public ConsumersManager consumersManager() {
return new ConsumersManager(instrumentationManager,
rocketBinderConfigurationProperties);
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
index 1b0bd0f51..7e40cd5b0 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
@@ -1,7 +1,9 @@
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
+import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@@ -13,6 +15,7 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class)
+@ConditionalOnClass(Endpoint.class)
public class RocketMQBinderEndpointAutoConfiguration {
@Bean
@@ -21,16 +24,14 @@ public class RocketMQBinderEndpointAutoConfiguration {
}
@Bean
- public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(
- InstrumentationManager instrumentationManager) {
- return new RocketMQBinderHealthIndicator(instrumentationManager);
+ public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
+ return new RocketMQBinderHealthIndicator();
}
@Bean
- public InstrumentationManager instrumentationManager(
- RocketMQBinderEndpoint rocketBinderEndpoint) {
- return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(),
- rocketBinderEndpoint.runtime());
+ @ConditionalOnClass(name = "com.codahale.metrics.Counter")
+ public InstrumentationManager instrumentationManager() {
+ return new InstrumentationManager();
}
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
index 4184e3f41..2dd425b03 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
@@ -3,6 +3,7 @@ package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -27,9 +28,10 @@ public class ConsumersManager {
private final Map consumerGroups = new HashMap<>();
private final Map started = new HashMap<>();
private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>();
- private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+ private InstrumentationManager instrumentationManager;
+
public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager;
@@ -41,9 +43,12 @@ public class ConsumersManager {
ExtendedConsumerProperties consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic),
consumerProperties);
- ConsumerGroupInstrumentation instrumentation = instrumentationManager
- .getConsumerGroupInstrumentation(group);
- instrumentationManager.addHealthInstrumentation(instrumentation);
+
+ Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
+ ConsumerGroupInstrumentation instrumentation = manager
+ .getConsumerGroupInstrumentation(group);
+ instrumentationManager.addHealthInstrumentation(instrumentation);
+ });
if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group);
@@ -87,16 +92,23 @@ public class ConsumersManager {
if (started.get(group)) {
return;
}
- ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager
- .getConsumerGroupInstrumentation(group);
- instrumentationManager.addHealthInstrumentation(groupInstrumentation);
+
+ ConsumerGroupInstrumentation groupInstrumentation = null;
+ if (Optional.ofNullable(instrumentationManager).isPresent()) {
+ groupInstrumentation = instrumentationManager
+ .getConsumerGroupInstrumentation(group);
+ instrumentationManager.addHealthInstrumentation(groupInstrumentation);
+ }
+
try {
consumerGroups.get(group).start();
started.put(group, true);
- groupInstrumentation.markStartedSuccessfully();
+ Optional.ofNullable(groupInstrumentation)
+ .ifPresent(g -> g.markStartedSuccessfully());
}
catch (MQClientException e) {
- groupInstrumentation.markStartFailed(e);
+ Optional.ofNullable(groupInstrumentation)
+ .ifPresent(g -> g.markStartFailed(e));
logger.error("RocketMQ Consumer hasn't been started. Caused by "
+ e.getErrorMessage(), e);
throw e;
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
index 6b22c400c..1831dd7b5 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -48,20 +49,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private ConsumerInstrumentation consumerInstrumentation;
+ private InstrumentationManager instrumentationManager;
+
+ private RetryTemplate retryTemplate;
+
+ private RecoveryCallback extends Object> recoveryCallback;
+
private final ExtendedConsumerProperties consumerProperties;
private final String destination;
private final String group;
- private final InstrumentationManager instrumentationManager;
-
private final ConsumersManager consumersManager;
- private RetryTemplate retryTemplate;
-
- private RecoveryCallback extends Object> recoveryCallback;
-
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
ExtendedConsumerProperties consumerProperties,
String destination, String group,
@@ -75,21 +76,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
@Override
protected void doStart() {
- if (!consumerProperties.getExtension().getEnabled()) {
+ if (consumerProperties == null
+ || !consumerProperties.getExtension().getEnabled()) {
return;
}
- String tags = consumerProperties == null ? null
- : consumerProperties.getExtension().getTags();
- Boolean isOrderly = consumerProperties == null ? false
- : consumerProperties.getExtension().getOrderly();
+ String tags = consumerProperties.getExtension().getTags();
+ Boolean isOrderly = consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly
- ? new CloudStreamMessageListenerOrderly(instrumentationManager)
- : new CloudStreamMessageListenerConcurrently(instrumentationManager);
+ ? new CloudStreamMessageListenerOrderly()
+ : new CloudStreamMessageListenerConcurrently();
if (retryTemplate != null) {
retryTemplate.registerListener(listener);
@@ -99,9 +99,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
.collect(Collectors.toSet());
- consumerInstrumentation = instrumentationManager
- .getConsumerInstrumentation(destination);
- instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
+ Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
+ consumerInstrumentation = manager.getConsumerInstrumentation(destination);
+ manager.addHealthInstrumentation(consumerInstrumentation);
+ });
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
@@ -111,10 +112,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
else {
consumer.subscribe(destination, String.join(" || ", tagsSet));
}
- consumerInstrumentation.markStartedSuccessfully();
+ Optional.ofNullable(consumerInstrumentation)
+ .ifPresent(c -> c.markStartedSuccessfully());
}
catch (MQClientException e) {
- consumerInstrumentation.markStartFailed(e);
+ Optional.ofNullable(consumerInstrumentation)
+ .ifPresent(c -> c.markStartFailed(e));
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
+ e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
@@ -148,12 +151,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
- private final InstrumentationManager instrumentationManager;
-
- CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
- this.instrumentationManager = instrumentationManager;
- }
-
Acknowledgement consumeMessage(final List msgs) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
try {
@@ -180,23 +177,29 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
else {
Acknowledgement result = doSendMsgs(msgs, null);
- instrumentationManager
- .getConsumerInstrumentation(
- RocketMQInboundChannelAdapter.this.destination)
- .markConsumed();
+ Optional.ofNullable(
+ RocketMQInboundChannelAdapter.this.instrumentationManager)
+ .ifPresent(manager -> {
+ manager.getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumed();
+ });
return result;
}
}
catch (Exception e) {
logger.error(
- "Rocket Message hasn't been processed successfully. Caused by ",
+ "RocketMQ Message hasn't been processed successfully. Caused by ",
e);
- instrumentationManager
- .getConsumerInstrumentation(
- RocketMQInboundChannelAdapter.this.destination)
- .markConsumedFailure();
+ Optional.ofNullable(
+ RocketMQInboundChannelAdapter.this.instrumentationManager)
+ .ifPresent(manager -> {
+ manager.getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumedFailure();
+ });
throw new RuntimeException(
- "Rocket Message hasn't been processed successfully. Caused by ",
+ "RocketMQ Message hasn't been processed successfully. Caused by ",
e);
}
}
@@ -232,16 +235,22 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public void close(RetryContext context,
RetryCallback callback, Throwable throwable) {
if (throwable != null) {
- instrumentationManager
- .getConsumerInstrumentation(
- RocketMQInboundChannelAdapter.this.destination)
- .markConsumedFailure();
+ Optional.ofNullable(
+ RocketMQInboundChannelAdapter.this.instrumentationManager)
+ .ifPresent(manager -> {
+ manager.getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumedFailure();
+ });
}
else {
- instrumentationManager
- .getConsumerInstrumentation(
- RocketMQInboundChannelAdapter.this.destination)
- .markConsumed();
+ Optional.ofNullable(
+ RocketMQInboundChannelAdapter.this.instrumentationManager)
+ .ifPresent(manager -> {
+ manager.getConsumerInstrumentation(
+ RocketMQInboundChannelAdapter.this.destination)
+ .markConsumed();
+ });
}
}
@@ -254,11 +263,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerConcurrently
extends CloudStreamMessageListener implements MessageListenerConcurrently {
- public CloudStreamMessageListenerConcurrently(
- InstrumentationManager instrumentationManager) {
- super(instrumentationManager);
- }
-
@Override
public ConsumeConcurrentlyStatus consumeMessage(final List msgs,
ConsumeConcurrentlyContext context) {
@@ -272,11 +276,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
implements MessageListenerOrderly {
- public CloudStreamMessageListenerOrderly(
- InstrumentationManager instrumentationManager) {
- super(instrumentationManager);
- }
-
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs,
ConsumeOrderlyContext context) {
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index fb92aba70..1af261390 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -2,6 +2,7 @@ package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.time.Instant;
import java.util.Map;
+import java.util.Optional;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -30,14 +31,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private ProducerInstrumentation producerInstrumentation;
+ private InstrumentationManager instrumentationManager;
+
private final RocketMQProducerProperties producerProperties;
private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
- private final InstrumentationManager instrumentationManager;
-
protected volatile boolean running = false;
public RocketMQMessageHandler(String destination,
@@ -54,9 +55,10 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void start() {
producer = new DefaultMQProducer(destination);
- producerInstrumentation = instrumentationManager
- .getProducerInstrumentation(destination);
- instrumentationManager.addHealthInstrumentation(producerInstrumentation);
+ Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
+ producerInstrumentation = manager.getProducerInstrumentation(destination);
+ manager.addHealthInstrumentation(producerInstrumentation);
+ });
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
@@ -66,10 +68,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
try {
producer.start();
- producerInstrumentation.markStartedSuccessfully();
+ Optional.ofNullable(producerInstrumentation)
+ .ifPresent(p -> p.markStartedSuccessfully());
}
catch (MQClientException e) {
- producerInstrumentation.markStartFailed(e);
+ Optional.ofNullable(producerInstrumentation)
+ .ifPresent(p -> p.markStartFailed(e));
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
@@ -127,14 +131,16 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
sendRes);
}
- instrumentationManager.getRuntime().put(
- RocketMQBinderConstants.LASTSEND_TIMESTAMP,
- Instant.now().toEpochMilli());
- producerInstrumentation.markSent();
+ Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
+ manager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
+ Instant.now().toEpochMilli());
+ });
+ Optional.ofNullable(producerInstrumentation).ifPresent(p -> p.markSent());
}
catch (MQClientException | RemotingException | MQBrokerException
| InterruptedException | UnsupportedOperationException e) {
- producerInstrumentation.markSentFailure();
+ Optional.ofNullable(producerInstrumentation)
+ .ifPresent(p -> p.markSentFailure());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
index 83b54c3ed..87f0c9784 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
@@ -1,7 +1,5 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import com.codahale.metrics.MetricRegistry;
/**
@@ -11,24 +9,9 @@ import com.codahale.metrics.MetricRegistry;
public class ConsumerGroupInstrumentation extends Instrumentation {
private MetricRegistry metricRegistry;
- private AtomicBoolean delayedStart = new AtomicBoolean(false);
-
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
super(name);
this.metricRegistry = metricRegistry;
}
- public void markDelayedStart() {
- delayedStart.set(true);
- }
-
- @Override
- public boolean isUp() {
- return started.get() || delayedStart.get();
- }
-
- @Override
- public boolean isOutOfService() {
- return !started.get() && startException == null && !delayedStart.get();
- }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
index 764307374..ad72f4d82 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
@@ -2,6 +2,8 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@@ -19,13 +21,15 @@ public class ConsumerInstrumentation extends Instrumentation {
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
- this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
+
+ this.totalConsumed = registry
+ .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED));
this.consumedPerSecond = registry
- .meter(name(baseMetricName, "consumedPerSecond"));
+ .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND));
this.totalConsumedFailures = registry
- .counter(name(baseMetricName, "totalConsumedFailures"));
+ .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES));
this.consumedFailuresPerSecond = registry
- .meter(name(baseMetricName, "consumedFailuresPerSecond"));
+ .meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND));
}
public void markConsumed() {
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
index 601356f2e..811ba01d7 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
@@ -3,8 +3,12 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
+
import com.codahale.metrics.MetricRegistry;
/**
@@ -12,36 +16,32 @@ import com.codahale.metrics.MetricRegistry;
* @author Jim
*/
public class InstrumentationManager {
- private final MetricRegistry metricRegistry;
- private final Map runtime;
+
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+ private final Map runtime = new ConcurrentHashMap<>();
+
private final Map producerInstrumentations = new HashMap<>();
private final Map consumeInstrumentations = new HashMap<>();
private final Map consumerGroupsInstrumentations = new HashMap<>();
private final Map healthInstrumentations = new HashMap<>();
- public InstrumentationManager(MetricRegistry metricRegistry,
- Map runtime) {
- this.metricRegistry = metricRegistry;
- this.runtime = runtime;
- }
-
public ProducerInstrumentation getProducerInstrumentation(String destination) {
- String key = "scs-rocketmq.producer." + destination;
+ String key = Producer.PREFIX + destination;
producerInstrumentations.putIfAbsent(key,
new ProducerInstrumentation(metricRegistry, key));
return producerInstrumentations.get(key);
}
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
- String key = "scs-rocketmq.consumer." + destination;
+ String key = Consumer.PREFIX + destination;
consumeInstrumentations.putIfAbsent(key,
new ConsumerInstrumentation(metricRegistry, key));
return consumeInstrumentations.get(key);
}
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
- String key = "scs-rocketmq.consumerGroup." + group;
+ String key = Consumer.GROUP_PREFIX + group;
consumerGroupsInstrumentations.putIfAbsent(key,
new ConsumerGroupInstrumentation(metricRegistry, key));
return consumerGroupsInstrumentations.get(key);
@@ -59,4 +59,8 @@ public class InstrumentationManager {
public Map getRuntime() {
return runtime;
}
+
+ public MetricRegistry getMetricRegistry() {
+ return metricRegistry;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
index 1db95ccf5..68c9c0eea 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
@@ -2,6 +2,8 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics;
import static com.codahale.metrics.MetricRegistry.name;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@@ -19,12 +21,14 @@ public class ProducerInstrumentation extends Instrumentation {
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
- this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
+
+ this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT));
this.totalSentFailures = registry
- .counter(name(baseMetricName, "totalSentFailures"));
- this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
+ .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES));
+ this.sentPerSecond = registry
+ .meter(name(baseMetricName, Producer.SENT_PER_SECOND));
this.sentFailuresPerSecond = registry
- .meter(name(baseMetricName, "sentFailuresPerSecond"));
+ .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND));
}
public void markSent() {
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
index dd00e7d13..087d95c2f 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
@@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties {
- private String namesrvAddr;
+ private String namesrvAddr = "127.0.0.1:9876";
private String logLevel = "ERROR";