add rocketmq-binder and starter
parent
10bb95c11a
commit
3263d203c2
@ -0,0 +1,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba</artifactId>
|
||||
<version>0.1.1.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||||
<name>Spring Cloud Starter Stream RocketMQ</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,71 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-alibaba</artifactId>
|
||||
<version>0.1.1.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
|
||||
<name>Spring Cloud Alibaba RocketMQ Binder</name>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-client</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-actuator</artifactId>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public interface RocketMQBinderConstants {
|
||||
|
||||
/**
|
||||
* Header key
|
||||
*/
|
||||
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE";
|
||||
|
||||
String ROCKET_FLAG = "ROCKETMQ_FLAG";
|
||||
|
||||
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
|
||||
|
||||
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
|
||||
|
||||
/**
|
||||
* Instrumentation key
|
||||
*/
|
||||
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
|
||||
|
||||
String ENDPOINT_ID = "rocketmq_binder";
|
||||
|
||||
}
|
@ -0,0 +1,117 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
|
||||
|
||||
private final RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
private final RocketMQTopicProvisioner rocketTopicProvisioner;
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
private final ConsumersManager consumersManager;
|
||||
|
||||
public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties,
|
||||
RocketMQTopicProvisioner provisioningProvider,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
super(true, null, provisioningProvider);
|
||||
this.consumersManager = consumersManager;
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
this.rocketTopicProvisioner = provisioningProvider;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties>
|
||||
producerProperties,
|
||||
MessageChannel errorChannel) throws Exception {
|
||||
if (producerProperties.getExtension().getEnabled()) {
|
||||
return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
|
||||
rocketBinderConfigurationProperties, instrumentationManager);
|
||||
} else {
|
||||
throw new RuntimeException(
|
||||
"Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties>
|
||||
consumerProperties)
|
||||
throws Exception {
|
||||
if (group == null || "".equals(group)) {
|
||||
throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
|
||||
}
|
||||
|
||||
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
|
||||
consumerProperties, destination.getName(), group, instrumentationManager);
|
||||
|
||||
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
|
||||
consumerProperties);
|
||||
if (consumerProperties.getMaxAttempts() > 1) {
|
||||
rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
|
||||
rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
|
||||
} else {
|
||||
rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
|
||||
}
|
||||
|
||||
return rocketInboundChannelAdapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
|
||||
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
|
||||
return extendedBindingProperties.getExtendedProducerProperties(channelName);
|
||||
}
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
|
||||
import org.springframework.integration.support.MutableMessage;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
||||
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
|
||||
|
||||
public RocketMQMessageHeaderAccessor() {
|
||||
super();
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor(Message<?> message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public Acknowledgement getAcknowledgement(Message message) {
|
||||
return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
|
||||
setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getTags() {
|
||||
return getMessageHeaders().get(MessageConst.PROPERTY_TAGS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_TAGS);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withTags(String tag) {
|
||||
setHeader(MessageConst.PROPERTY_TAGS, tag);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getKeys() {
|
||||
return getMessageHeaders().get(MessageConst.PROPERTY_KEYS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_KEYS);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withKeys(String keys) {
|
||||
setHeader(MessageConst.PROPERTY_KEYS, keys);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MessageExt getRocketMessage() {
|
||||
return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
|
||||
setHeader(ORIGINAL_ROCKET_MESSAGE, message);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getDelayTimeLevel() {
|
||||
return NumberUtils.toInt((String)getMessageHeaders().get(MessageConst.PROPERTY_DELAY_TIME_LEVEL), 0);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
|
||||
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getFlag() {
|
||||
return NumberUtils.toInt((String)getMessageHeaders().get(ROCKET_FLAG), 0);
|
||||
}
|
||||
|
||||
public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
|
||||
setHeader(ROCKET_FLAG, delayTimeLevel);
|
||||
return this;
|
||||
}
|
||||
|
||||
public SendResult getSendResult() {
|
||||
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
|
||||
}
|
||||
|
||||
public static void putSendResult(MutableMessage message, SendResult sendResult) {
|
||||
message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
|
||||
}
|
||||
|
||||
public Map<String, String> getUserProperties() {
|
||||
Map<String, String> result = new HashMap<>();
|
||||
for (Map.Entry<String, Object> entry : this.toMap().entrySet()) {
|
||||
if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
|
||||
.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
|
||||
result.put(entry.getKey(), (String)entry.getValue());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.actuator;
|
||||
|
||||
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQBinderEndpoint extends AbstractEndpoint<Map<String, Object>> {
|
||||
|
||||
private MetricRegistry metricRegistry = new MetricRegistry();
|
||||
private Map<String, Object> runtime = new ConcurrentHashMap<>();
|
||||
|
||||
public RocketMQBinderEndpoint() {
|
||||
super(ENDPOINT_ID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> invoke() {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("metrics", metricRegistry().getMetrics());
|
||||
result.put("runtime", runtime());
|
||||
return result;
|
||||
}
|
||||
|
||||
public MetricRegistry metricRegistry() {
|
||||
return metricRegistry;
|
||||
}
|
||||
|
||||
public Map<String, Object> runtime() {
|
||||
return runtime;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.actuator;
|
||||
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doHealthCheck(Health.Builder builder) throws Exception {
|
||||
int upCount = 0, outOfServiceCount = 0;
|
||||
for (Instrumentation instrumentation : instrumentationManager
|
||||
.getHealthInstrumentations()) {
|
||||
if (instrumentation.isUp()) {
|
||||
upCount++;
|
||||
}
|
||||
else if (instrumentation.isOutOfService()) {
|
||||
upCount++;
|
||||
}
|
||||
}
|
||||
if (upCount == instrumentationManager.getHealthInstrumentations().size()) {
|
||||
builder.up();
|
||||
return;
|
||||
}
|
||||
else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations()
|
||||
.size()) {
|
||||
builder.outOfService();
|
||||
return;
|
||||
}
|
||||
builder.down();
|
||||
|
||||
for (Instrumentation instrumentation : instrumentationManager
|
||||
.getHealthInstrumentations()) {
|
||||
if (!instrumentation.isStarted()) {
|
||||
builder.withException(instrumentation.getStartException());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import org.apache.rocketmq.client.log.ClientLogger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration
|
||||
@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
|
||||
public class RocketMQBinderAutoConfiguration {
|
||||
|
||||
private final RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
@Autowired
|
||||
public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQTopicProvisioner provisioningProvider() {
|
||||
return new RocketMQTopicProvisioner();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
|
||||
InstrumentationManager instrumentationManager,
|
||||
ConsumersManager consumersManager) {
|
||||
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
|
||||
provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
|
||||
return binder;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
|
||||
return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter(EndpointAutoConfiguration.class)
|
||||
public class RocketMQBinderEndpointAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RocketMQBinderEndpoint rocketBinderEndpoint() {
|
||||
return new RocketMQBinderEndpoint();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(
|
||||
InstrumentationManager instrumentationManager) {
|
||||
return new RocketMQBinderHealthIndicator(instrumentationManager);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public InstrumentationManager instrumentationManager(
|
||||
RocketMQBinderEndpoint rocketBinderEndpoint) {
|
||||
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(),
|
||||
rocketBinderEndpoint.runtime());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class Acknowledgement {
|
||||
|
||||
/**
|
||||
* for {@link ConsumeConcurrentlyContext} using
|
||||
*/
|
||||
private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
/**
|
||||
* Message consume retry strategy<br>
|
||||
* -1,no retry,put into DLQ directly<br>
|
||||
* 0,broker control retry frequency<br>
|
||||
* >0,client control retry frequency
|
||||
*/
|
||||
private Integer consumeConcurrentlyDelayLevel = 0;
|
||||
|
||||
/**
|
||||
* for {@link ConsumeOrderlyContext} using
|
||||
*/
|
||||
private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
|
||||
private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
|
||||
|
||||
public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
|
||||
this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
|
||||
return consumeConcurrentlyStatus;
|
||||
}
|
||||
|
||||
public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
|
||||
return consumeOrderlyStatus;
|
||||
}
|
||||
|
||||
public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) {
|
||||
this.consumeOrderlyStatus = consumeOrderlyStatus;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Integer getConsumeConcurrentlyDelayLevel() {
|
||||
return consumeConcurrentlyDelayLevel;
|
||||
}
|
||||
|
||||
public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
|
||||
this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
|
||||
}
|
||||
|
||||
public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
|
||||
return consumeOrderlySuspendCurrentQueueTimeMill;
|
||||
}
|
||||
|
||||
public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
|
||||
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
|
||||
}
|
||||
|
||||
public static Acknowledgement buildOrderlyInstance() {
|
||||
Acknowledgement acknowledgement = new Acknowledgement();
|
||||
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
|
||||
return acknowledgement;
|
||||
}
|
||||
|
||||
public static Acknowledgement buildConcurrentlyInstance() {
|
||||
Acknowledgement acknowledgement = new Acknowledgement();
|
||||
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
|
||||
return acknowledgement;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ConsumersManager {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
|
||||
private final Map<String, Boolean> started = new HashMap<>();
|
||||
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap
|
||||
= new HashMap<>();
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
public ConsumersManager(InstrumentationManager instrumentationManager,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
}
|
||||
|
||||
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
|
||||
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
|
||||
ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
|
||||
instrumentationManager.addHealthInstrumentation(instrumentation);
|
||||
|
||||
if (consumerGroups.containsKey(group)) {
|
||||
return consumerGroups.get(group);
|
||||
}
|
||||
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
|
||||
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
consumerGroups.put(group, consumer);
|
||||
started.put(group, false);
|
||||
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
|
||||
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
|
||||
if (consumerProperties.getExtension().getBroadcasting()) {
|
||||
consumer.setMessageModel(MessageModel.BROADCASTING);
|
||||
}
|
||||
logger.info("RocketMQ consuming for SCS group {} created", group);
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public synchronized void startConsumers() throws MQClientException {
|
||||
for (String group : getConsumerGroups()) {
|
||||
start(group);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void startConsumer(String group) throws MQClientException {
|
||||
start(group);
|
||||
}
|
||||
|
||||
public synchronized void stopConsumer(String group) {
|
||||
stop(group);
|
||||
}
|
||||
|
||||
private void stop(String group) {
|
||||
if (consumerGroups.get(group) != null) {
|
||||
consumerGroups.get(group).shutdown();
|
||||
started.put(group, false);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void start(String group) throws MQClientException {
|
||||
if (started.get(group)) {
|
||||
return;
|
||||
}
|
||||
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
|
||||
group);
|
||||
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
|
||||
try {
|
||||
consumerGroups.get(group).start();
|
||||
started.put(group, true);
|
||||
groupInstrumentation.markStartedSuccessfully();
|
||||
} catch (MQClientException e) {
|
||||
groupInstrumentation.markStartFailed(e);
|
||||
logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Set<String> getConsumerGroups() {
|
||||
return consumerGroups.keySet();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,314 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.ClassUtils;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListener;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import org.springframework.integration.endpoint.MessageProducerSupport;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RecoveryCallback;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(RocketMQInboundChannelAdapter.class);
|
||||
|
||||
private ConsumerInstrumentation consumerInstrumentation;
|
||||
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
|
||||
|
||||
private final String destination;
|
||||
|
||||
private final String group;
|
||||
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
private final ConsumersManager consumersManager;
|
||||
|
||||
private RetryTemplate retryTemplate;
|
||||
|
||||
private RecoveryCallback<? extends Object> recoveryCallback;
|
||||
|
||||
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
|
||||
String destination, String group,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
this.consumersManager = consumersManager;
|
||||
this.consumerProperties = consumerProperties;
|
||||
this.destination = destination;
|
||||
this.group = group;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
if (!consumerProperties.getExtension().getEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String tags = consumerProperties == null ? null
|
||||
: consumerProperties.getExtension().getTags();
|
||||
Boolean isOrderly = consumerProperties == null ? false
|
||||
: consumerProperties.getExtension().getOrderly();
|
||||
|
||||
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
|
||||
destination, consumerProperties);
|
||||
|
||||
final CloudStreamMessageListener listener = isOrderly
|
||||
? new CloudStreamMessageListenerOrderly(instrumentationManager)
|
||||
: new CloudStreamMessageListenerConcurrently(instrumentationManager);
|
||||
|
||||
if (retryTemplate != null) {
|
||||
retryTemplate.registerListener(listener);
|
||||
}
|
||||
|
||||
Set<String> tagsSet = new HashSet<>();
|
||||
if (!StringUtils.isEmpty(tags)) {
|
||||
for (String tag : tags.split("\\|\\|")) {
|
||||
tagsSet.add(tag.trim());
|
||||
}
|
||||
}
|
||||
|
||||
consumerInstrumentation = instrumentationManager
|
||||
.getConsumerInstrumentation(destination);
|
||||
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
|
||||
|
||||
try {
|
||||
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
|
||||
consumer.subscribe(destination, MessageSelector
|
||||
.bySql(consumerProperties.getExtension().getSql()));
|
||||
}
|
||||
else {
|
||||
consumer.subscribe(destination,
|
||||
org.apache.commons.lang3.StringUtils.join(tagsSet, " || "));
|
||||
}
|
||||
consumerInstrumentation.markStartedSuccessfully();
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
consumerInstrumentation.markStartFailed(e);
|
||||
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
|
||||
+ e.getErrorMessage(), e);
|
||||
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
|
||||
}
|
||||
|
||||
consumer.registerMessageListener(listener);
|
||||
|
||||
try {
|
||||
consumersManager.startConsumer(group);
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
logger.error(
|
||||
"RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(),
|
||||
e);
|
||||
throw new RuntimeException("RocketMQ Consumer startup failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
consumersManager.stopConsumer(group);
|
||||
}
|
||||
|
||||
public void setRetryTemplate(RetryTemplate retryTemplate) {
|
||||
this.retryTemplate = retryTemplate;
|
||||
}
|
||||
|
||||
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
|
||||
this.recoveryCallback = recoveryCallback;
|
||||
}
|
||||
|
||||
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
|
||||
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
||||
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
|
||||
try {
|
||||
if (enableRetry) {
|
||||
return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
|
||||
// (RetryCallback<Acknowledgement, Exception>)
|
||||
new RetryCallback<Acknowledgement, Exception>() {
|
||||
@Override
|
||||
public Acknowledgement doWithRetry(RetryContext context)
|
||||
throws Exception {
|
||||
return doSendMsgs(msgs, context);
|
||||
}
|
||||
}, new RecoveryCallback<Acknowledgement>() {
|
||||
@Override
|
||||
public Acknowledgement recover(RetryContext context)
|
||||
throws Exception {
|
||||
RocketMQInboundChannelAdapter.this.recoveryCallback
|
||||
.recover(context);
|
||||
if (ClassUtils.isAssignable(this.getClass(),
|
||||
MessageListenerConcurrently.class)) {
|
||||
return Acknowledgement
|
||||
.buildConcurrentlyInstance();
|
||||
}
|
||||
else {
|
||||
return Acknowledgement.buildOrderlyInstance();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
Acknowledgement result = doSendMsgs(msgs, null);
|
||||
instrumentationManager
|
||||
.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumed();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error(
|
||||
"Rocket Message hasn't been processed successfully. Caused by ",
|
||||
e);
|
||||
instrumentationManager
|
||||
.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumedFailure();
|
||||
throw new RuntimeException(
|
||||
"Rocket Message hasn't been processed successfully. Caused by ",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
private Acknowledgement doSendMsgs(final List<MessageExt> msgs,
|
||||
RetryContext context) {
|
||||
List<Acknowledgement> acknowledgements = new ArrayList<>();
|
||||
for (MessageExt msg : msgs) {
|
||||
String retryInfo = context == null ? ""
|
||||
: "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
|
||||
logger.debug(retryInfo + "consuming msg:\n" + msg);
|
||||
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
|
||||
Acknowledgement acknowledgement = new Acknowledgement();
|
||||
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody())
|
||||
.setHeaders(new RocketMQMessageHeaderAccessor()
|
||||
.withAcknowledgment(acknowledgement)
|
||||
.withTags(msg.getTags()).withKeys(msg.getKeys())
|
||||
.withFlag(msg.getFlag()).withRocketMessage(msg))
|
||||
.build();
|
||||
acknowledgements.add(acknowledgement);
|
||||
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
|
||||
}
|
||||
return acknowledgements.get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> boolean open(RetryContext context,
|
||||
RetryCallback<T, E> callback) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void close(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
if (throwable != null) {
|
||||
instrumentationManager
|
||||
.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumedFailure();
|
||||
}
|
||||
else {
|
||||
instrumentationManager
|
||||
.getConsumerInstrumentation(
|
||||
RocketMQInboundChannelAdapter.this.destination)
|
||||
.markConsumed();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
}
|
||||
}
|
||||
|
||||
protected class CloudStreamMessageListenerConcurrently
|
||||
extends CloudStreamMessageListener implements MessageListenerConcurrently {
|
||||
|
||||
public CloudStreamMessageListenerConcurrently(
|
||||
InstrumentationManager instrumentationManager) {
|
||||
super(instrumentationManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
|
||||
ConsumeConcurrentlyContext context) {
|
||||
Acknowledgement acknowledgement = consumeMessage(msgs);
|
||||
context.setDelayLevelWhenNextConsume(
|
||||
acknowledgement.getConsumeConcurrentlyDelayLevel());
|
||||
return acknowledgement.getConsumeConcurrentlyStatus();
|
||||
}
|
||||
}
|
||||
|
||||
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
|
||||
implements MessageListenerOrderly {
|
||||
|
||||
public CloudStreamMessageListenerOrderly(
|
||||
InstrumentationManager instrumentationManager) {
|
||||
super(instrumentationManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeOrderlyContext context) {
|
||||
Acknowledgement acknowledgement = consumeMessage(msgs);
|
||||
context.setSuspendCurrentQueueTimeMillis(
|
||||
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
|
||||
return acknowledgement.getConsumeOrderlyStatus();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,160 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.client.producer.SendStatus;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.support.MutableMessage;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
|
||||
|
||||
private DefaultMQProducer producer;
|
||||
|
||||
private ProducerInstrumentation producerInstrumentation;
|
||||
|
||||
private final RocketMQProducerProperties producerProperties;
|
||||
|
||||
private final String destination;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
protected volatile boolean running = false;
|
||||
|
||||
public RocketMQMessageHandler(String destination,
|
||||
RocketMQProducerProperties producerProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
this.destination = destination;
|
||||
this.producerProperties = producerProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
producer = new DefaultMQProducer(destination);
|
||||
|
||||
producerInstrumentation = instrumentationManager
|
||||
.getProducerInstrumentation(destination);
|
||||
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
|
||||
|
||||
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
|
||||
|
||||
if (producerProperties.getMaxMessageSize() > 0) {
|
||||
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
|
||||
}
|
||||
|
||||
try {
|
||||
producer.start();
|
||||
producerInstrumentation.markStartedSuccessfully();
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
producerInstrumentation.markStartFailed(e);
|
||||
logger.error(
|
||||
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
|
||||
throw new MessagingException(e.getMessage(), e);
|
||||
}
|
||||
running = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (producer != null) {
|
||||
producer.shutdown();
|
||||
}
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
|
||||
throws Exception {
|
||||
try {
|
||||
Message toSend;
|
||||
if (message.getPayload() instanceof byte[]) {
|
||||
toSend = new Message(destination, (byte[]) message.getPayload());
|
||||
}
|
||||
else if (message.getPayload() instanceof String) {
|
||||
toSend = new Message(destination,
|
||||
((String) message.getPayload()).getBytes());
|
||||
}
|
||||
else {
|
||||
throw new UnsupportedOperationException("Payload class isn't supported: "
|
||||
+ message.getPayload().getClass());
|
||||
}
|
||||
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(
|
||||
message);
|
||||
headerAccessor.setLeaveMutable(true);
|
||||
toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
|
||||
toSend.setTags(headerAccessor.getTags());
|
||||
toSend.setKeys(headerAccessor.getKeys());
|
||||
toSend.setFlag(headerAccessor.getFlag());
|
||||
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties()
|
||||
.entrySet()) {
|
||||
toSend.putUserProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
SendResult sendRes = producer.send(toSend);
|
||||
|
||||
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
throw new MQClientException("message hasn't been sent", null);
|
||||
}
|
||||
if (message instanceof MutableMessage) {
|
||||
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
|
||||
sendRes);
|
||||
}
|
||||
instrumentationManager.getRuntime().put(
|
||||
RocketMQBinderConstants.LASTSEND_TIMESTAMP,
|
||||
System.currentTimeMillis());
|
||||
producerInstrumentation.markSent();
|
||||
}
|
||||
catch (MQClientException | RemotingException | MQBrokerException
|
||||
| InterruptedException | UnsupportedOperationException e) {
|
||||
producerInstrumentation.markSentFailure();
|
||||
logger.error(
|
||||
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
|
||||
throw new MessagingException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ConsumerGroupInstrumentation extends Instrumentation {
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
|
||||
super(name);
|
||||
this.metricRegistry = metricRegistry;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
/**
|
||||
* @author juven.xuxb
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ConsumerInstrumentation extends Instrumentation {
|
||||
|
||||
private final Counter totalConsumed;
|
||||
private final Counter totalConsumedFailures;
|
||||
private final Meter consumedPerSecond;
|
||||
private final Meter consumedFailuresPerSecond;
|
||||
|
||||
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
|
||||
super(baseMetricName);
|
||||
this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
|
||||
this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
|
||||
this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
|
||||
this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
|
||||
}
|
||||
|
||||
public void markConsumed() {
|
||||
totalConsumed.inc();
|
||||
consumedPerSecond.mark();
|
||||
}
|
||||
|
||||
public void markConsumedFailure() {
|
||||
totalConsumedFailures.inc();
|
||||
consumedFailuresPerSecond.mark();
|
||||
}
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class Instrumentation {
|
||||
private final String name;
|
||||
protected final AtomicBoolean started = new AtomicBoolean(false);
|
||||
protected Exception startException = null;
|
||||
|
||||
Instrumentation(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public boolean isDown() {
|
||||
return startException != null;
|
||||
}
|
||||
|
||||
public boolean isUp() {
|
||||
return started.get();
|
||||
}
|
||||
|
||||
public boolean isOutOfService() {
|
||||
return !started.get() && startException == null;
|
||||
}
|
||||
|
||||
public void markStartedSuccessfully() {
|
||||
started.set(true);
|
||||
}
|
||||
|
||||
public void markStartFailed(Exception e) {
|
||||
started.set(false);
|
||||
startException = e;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return started.get();
|
||||
}
|
||||
|
||||
public Exception getStartException() {
|
||||
return startException;
|
||||
}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class InstrumentationManager {
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final Map<String, Object> runtime;
|
||||
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
|
||||
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
|
||||
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
|
||||
|
||||
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
|
||||
|
||||
public InstrumentationManager(MetricRegistry metricRegistry,
|
||||
Map<String, Object> runtime) {
|
||||
this.metricRegistry = metricRegistry;
|
||||
this.runtime = runtime;
|
||||
}
|
||||
|
||||
public ProducerInstrumentation getProducerInstrumentation(String destination) {
|
||||
String key = "scs-rocketmq.producer." + destination;
|
||||
ProducerInstrumentation producerInstrumentation = producerInstrumentations
|
||||
.get(key);
|
||||
if (producerInstrumentation == null) {
|
||||
producerInstrumentations.put(key,
|
||||
new ProducerInstrumentation(metricRegistry, key));
|
||||
}
|
||||
return producerInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
|
||||
String key = "scs-rocketmq.consumer." + destination;
|
||||
ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations
|
||||
.get(key);
|
||||
if (consumerInstrumentation == null) {
|
||||
consumeInstrumentations.put(key,
|
||||
new ConsumerInstrumentation(metricRegistry, key));
|
||||
}
|
||||
return consumeInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
|
||||
String key = "scs-rocketmq.consumerGroup." + group;
|
||||
ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations
|
||||
.get(key);
|
||||
if (consumerGroupInstrumentation == null) {
|
||||
consumerGroupsInstrumentations.put(key,
|
||||
new ConsumerGroupInstrumentation(metricRegistry, key));
|
||||
}
|
||||
return consumerGroupsInstrumentations.get(key);
|
||||
}
|
||||
|
||||
public Set<Instrumentation> getHealthInstrumentations() {
|
||||
return new HashSet<>(healthInstrumentations.values());
|
||||
}
|
||||
|
||||
public void addHealthInstrumentation(Instrumentation instrumentation) {
|
||||
healthInstrumentations.put(instrumentation.getName(), instrumentation);
|
||||
}
|
||||
|
||||
public Map<String, Object> getRuntime() {
|
||||
return runtime;
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.metrics;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
/**
|
||||
* @author juven.xuxb
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class ProducerInstrumentation extends Instrumentation {
|
||||
|
||||
private final Counter totalSent;
|
||||
private final Counter totalSentFailures;
|
||||
private final Meter sentPerSecond;
|
||||
private final Meter sentFailuresPerSecond;
|
||||
|
||||
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
|
||||
super(baseMetricName);
|
||||
this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
|
||||
this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
|
||||
this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
|
||||
this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
|
||||
}
|
||||
|
||||
public void markSent() {
|
||||
totalSent.inc();
|
||||
sentPerSecond.mark();
|
||||
}
|
||||
|
||||
public void markSentFailure() {
|
||||
totalSentFailures.inc();
|
||||
sentFailuresPerSecond.mark();
|
||||
}
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
|
||||
public class RocketMQBinderConfigurationProperties {
|
||||
|
||||
private String namesrvAddr = "127.0.0.1:9876";
|
||||
|
||||
private String logLevel = "ERROR";
|
||||
|
||||
public String getNamesrvAddr() {
|
||||
return namesrvAddr;
|
||||
}
|
||||
|
||||
public void setNamesrvAddr(String namesrvAddr) {
|
||||
this.namesrvAddr = namesrvAddr;
|
||||
}
|
||||
|
||||
public String getLogLevel() {
|
||||
return logLevel;
|
||||
}
|
||||
|
||||
public void setLogLevel(String logLevel) {
|
||||
this.logLevel = logLevel;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQBindingProperties {
|
||||
|
||||
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
|
||||
|
||||
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
|
||||
|
||||
public RocketMQConsumerProperties getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public void setConsumer(RocketMQConsumerProperties consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public RocketMQProducerProperties getProducer() {
|
||||
return producer;
|
||||
}
|
||||
|
||||
public void setProducer(RocketMQProducerProperties producer) {
|
||||
this.producer = producer;
|
||||
}
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.MQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQConsumerProperties {
|
||||
|
||||
/**
|
||||
* using '||' to split tag
|
||||
* {@link MQPushConsumer#subscribe(String, String)}
|
||||
*/
|
||||
private String tags;
|
||||
|
||||
/**
|
||||
* {@link MQPushConsumer#subscribe(String, MessageSelector)}
|
||||
* {@link MessageSelector#bySql(String)}
|
||||
*/
|
||||
private String sql;
|
||||
|
||||
/**
|
||||
* {@link MessageModel#BROADCASTING}
|
||||
*/
|
||||
private Boolean broadcasting = false;
|
||||
|
||||
/**
|
||||
* if orderly is true, using {@link MessageListenerOrderly}
|
||||
* else if orderly if false, using {@link MessageListenerConcurrently}
|
||||
*/
|
||||
private Boolean orderly = false;
|
||||
|
||||
private Boolean enabled = true;
|
||||
|
||||
public String getTags() {
|
||||
return tags;
|
||||
}
|
||||
|
||||
public void setTags(String tags) {
|
||||
this.tags = tags;
|
||||
}
|
||||
|
||||
public String getSql() {
|
||||
return sql;
|
||||
}
|
||||
|
||||
public void setSql(String sql) {
|
||||
this.sql = sql;
|
||||
}
|
||||
|
||||
public Boolean getOrderly() {
|
||||
return orderly;
|
||||
}
|
||||
|
||||
public void setOrderly(Boolean orderly) {
|
||||
this.orderly = orderly;
|
||||
}
|
||||
|
||||
public Boolean getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(Boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public Boolean getBroadcasting() {
|
||||
return broadcasting;
|
||||
}
|
||||
|
||||
public void setBroadcasting(Boolean broadcasting) {
|
||||
this.broadcasting = broadcasting;
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@ConfigurationProperties("spring.cloud.stream.rocketmq")
|
||||
public class RocketMQExtendedBindingProperties implements
|
||||
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
|
||||
|
||||
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
|
||||
|
||||
public Map<String, RocketMQBindingProperties> getBindings() {
|
||||
return this.bindings;
|
||||
}
|
||||
|
||||
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
|
||||
this.bindings = bindings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
|
||||
if (bindings.containsKey(channelName)) {
|
||||
if (bindings.get(channelName).getConsumer() != null) {
|
||||
return bindings.get(channelName).getConsumer();
|
||||
} else {
|
||||
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
|
||||
this.bindings.get(channelName).setConsumer(properties);
|
||||
return properties;
|
||||
}
|
||||
} else {
|
||||
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
|
||||
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
|
||||
rbp.setConsumer(properties);
|
||||
bindings.put(channelName, rbp);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
|
||||
if (bindings.containsKey(channelName)) {
|
||||
if (bindings.get(channelName).getProducer() != null) {
|
||||
return bindings.get(channelName).getProducer();
|
||||
} else {
|
||||
RocketMQProducerProperties properties = new RocketMQProducerProperties();
|
||||
this.bindings.get(channelName).setProducer(properties);
|
||||
return properties;
|
||||
}
|
||||
} else {
|
||||
RocketMQProducerProperties properties = new RocketMQProducerProperties();
|
||||
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
|
||||
rbp.setProducer(properties);
|
||||
bindings.put(channelName, rbp);
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.properties;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQProducerProperties {
|
||||
|
||||
private Boolean enabled = true;
|
||||
|
||||
/**
|
||||
* Maximum allowed message size in bytes
|
||||
* {@link DefaultMQProducer#maxMessageSize}
|
||||
*/
|
||||
private Integer maxMessageSize = 0;
|
||||
|
||||
public Boolean getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(Boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public Integer getMaxMessageSize() {
|
||||
return maxMessageSize;
|
||||
}
|
||||
|
||||
public void setMaxMessageSize(Integer maxMessageSize) {
|
||||
this.maxMessageSize = maxMessageSize;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq.provisioning;
|
||||
|
||||
import org.apache.rocketmq.client.Validators;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQTopicProvisioner
|
||||
implements
|
||||
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties>> {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
|
||||
|
||||
@Override
|
||||
public ProducerDestination provisionProducerDestination(String name,
|
||||
ExtendedProducerProperties<RocketMQProducerProperties>
|
||||
properties)
|
||||
throws ProvisioningException {
|
||||
checkTopic(name);
|
||||
return new RocketProducerDestination(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumerDestination provisionConsumerDestination(String name, String group,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties>
|
||||
properties)
|
||||
throws ProvisioningException {
|
||||
checkTopic(name);
|
||||
return new RocketConsumerDestination(name);
|
||||
}
|
||||
|
||||
private void checkTopic(String topic) {
|
||||
try {
|
||||
Validators.checkTopic(topic);
|
||||
} catch (MQClientException e) {
|
||||
logger.error("topic check error: " + topic, e);
|
||||
throw new AssertionError(e); // Can't happen
|
||||
}
|
||||
}
|
||||
|
||||
private static final class RocketProducerDestination implements ProducerDestination {
|
||||
|
||||
private final String producerDestinationName;
|
||||
|
||||
RocketProducerDestination(String destinationName) {
|
||||
this.producerDestinationName = destinationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return producerDestinationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNameForPartition(int partition) {
|
||||
return producerDestinationName;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final class RocketConsumerDestination implements ConsumerDestination {
|
||||
|
||||
private final String consumerDestinationName;
|
||||
|
||||
RocketConsumerDestination(String consumerDestinationName) {
|
||||
this.consumerDestinationName = consumerDestinationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return this.consumerDestinationName;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration
|
@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration
|
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rocketmq;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQAutoConfigurationTests {
|
||||
|
||||
private ConfigurableApplicationContext context;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
this.context = new SpringApplicationBuilder(
|
||||
RocketMQBinderEndpointAutoConfiguration.class,
|
||||
RocketMQBinderAutoConfiguration.class).web(false).run(
|
||||
"--spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
|
||||
"--spring.cloud.stream.bindings.output.destination=TopicOrderTest",
|
||||
"--spring.cloud.stream.bindings.output.content-type=application/json",
|
||||
"--spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
|
||||
"--spring.cloud.stream.bindings.input1.content-type=application/json",
|
||||
"--spring.cloud.stream.bindings.input1.group=test-group1",
|
||||
"--spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true",
|
||||
"--spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
|
||||
"--spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
|
||||
"--spring.cloud.stream.bindings.input2.content-type=application/json",
|
||||
"--spring.cloud.stream.bindings.input2.group=test-group2",
|
||||
"--spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false",
|
||||
"--spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProperties() {
|
||||
RocketMQBinderConfigurationProperties binderConfigurationProperties = context
|
||||
.getBean(RocketMQBinderConfigurationProperties.class);
|
||||
assertThat(binderConfigurationProperties.getNamesrvAddr())
|
||||
.isEqualTo("127.0.0.1:9876");
|
||||
RocketMQExtendedBindingProperties bindingProperties = context
|
||||
.getBean(RocketMQExtendedBindingProperties.class);
|
||||
assertThat(bindingProperties.getExtendedConsumerProperties("input2").getTags())
|
||||
.isEqualTo("tag1");
|
||||
assertThat(bindingProperties.getExtendedConsumerProperties("input2").getOrderly())
|
||||
.isFalse();
|
||||
assertThat(bindingProperties.getExtendedConsumerProperties("input1").getOrderly())
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue