fixes #120 and do some refactor

pull/898/head
fangjian0423 6 years ago
parent 05dc57f38f
commit 18e721ceed

@ -189,14 +189,14 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
catch (Exception e) {
logger.error(
"Rocket Message hasn't been processed successfully. Caused by ",
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
instrumentationManager
.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
throw new RuntimeException(
"Rocket Message hasn't been processed successfully. Caused by ",
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
}
}

@ -1,7 +1,5 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;
import com.codahale.metrics.MetricRegistry;
/**
@ -11,24 +9,9 @@ import com.codahale.metrics.MetricRegistry;
public class ConsumerGroupInstrumentation extends Instrumentation {
private MetricRegistry metricRegistry;
private AtomicBoolean delayedStart = new AtomicBoolean(false);
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
super(name);
this.metricRegistry = metricRegistry;
}
public void markDelayedStart() {
delayedStart.set(true);
}
@Override
public boolean isUp() {
return started.get() || delayedStart.get();
}
@Override
public boolean isOutOfService() {
return !started.get() && startException == null && !delayedStart.get();
}
}

@ -9,7 +9,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties {
private String namesrvAddr;
private String namesrvAddr = "127.0.0.1:9876";
private String logLevel = "ERROR";

Loading…
Cancel
Save