From 18e721ceedfca9a0c94b746fc0f8bb0e0af774eb Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Tue, 27 Nov 2018 13:55:38 +0800 Subject: [PATCH] fixes #120 and do some refactor --- .../RocketMQInboundChannelAdapter.java | 4 ++-- .../metrics/ConsumerGroupInstrumentation.java | 17 ----------------- .../RocketMQBinderConfigurationProperties.java | 2 +- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index 6b22c400c..84c2898e2 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -189,14 +189,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } catch (Exception e) { logger.error( - "Rocket Message hasn't been processed successfully. Caused by ", + "RocketMQ Message hasn't been processed successfully. Caused by ", e); instrumentationManager .getConsumerInstrumentation( RocketMQInboundChannelAdapter.this.destination) .markConsumedFailure(); throw new RuntimeException( - "Rocket Message hasn't been processed successfully. Caused by ", + "RocketMQ Message hasn't been processed successfully. Caused by ", e); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java index 83b54c3ed..87f0c9784 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java @@ -1,7 +1,5 @@ package org.springframework.cloud.stream.binder.rocketmq.metrics; -import java.util.concurrent.atomic.AtomicBoolean; - import com.codahale.metrics.MetricRegistry; /** @@ -11,24 +9,9 @@ import com.codahale.metrics.MetricRegistry; public class ConsumerGroupInstrumentation extends Instrumentation { private MetricRegistry metricRegistry; - private AtomicBoolean delayedStart = new AtomicBoolean(false); - public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { super(name); this.metricRegistry = metricRegistry; } - public void markDelayedStart() { - delayedStart.set(true); - } - - @Override - public boolean isUp() { - return started.get() || delayedStart.get(); - } - - @Override - public boolean isOutOfService() { - return !started.get() && startException == null && !delayedStart.get(); - } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index dd00e7d13..087d95c2f 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") public class RocketMQBinderConfigurationProperties { - private String namesrvAddr; + private String namesrvAddr = "127.0.0.1:9876"; private String logLevel = "ERROR";