Code refactoring and some new feature support

pull/2013/head
zkzlx 4 years ago
parent 68c25bc9a7
commit cbd64061ce

@ -98,6 +98,7 @@
<curator.version>4.0.1</curator.version> <curator.version>4.0.1</curator.version>
<!-- Apache RocketMQ --> <!-- Apache RocketMQ -->
<rocketmq.version>4.6.1</rocketmq.version>
<rocketmq.starter.version>2.0.2</rocketmq.starter.version> <rocketmq.starter.version>2.0.2</rocketmq.starter.version>
<!-- Maven Plugin Versions --> <!-- Maven Plugin Versions -->
@ -258,10 +259,20 @@
</exclusions> </exclusions>
</dependency> </dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.rocketmq</groupId>-->
<!-- <artifactId>rocketmq-spring-boot-starter</artifactId>-->
<!-- <version>${rocketmq.starter.version}</version>-->
<!-- </dependency>-->
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId> <artifactId>rocketmq-client</artifactId>
<version>${rocketmq.starter.version}</version> <version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${rocketmq.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

@ -50,9 +50,12 @@
<dependency> <dependency>
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId> <artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>

@ -1,89 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.Arrays;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public final class RocketMQBinderUtils {
private RocketMQBinderUtils() {
}
public static RocketMQBinderConfigurationProperties mergeProperties(
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQProperties rocketMQProperties) {
RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties();
if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) {
result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
}
else {
result.setNameServer(
Arrays.asList(rocketMQProperties.getNameServer().split(";")));
}
if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey());
}
else {
result.setAccessKey(rocketMQProperties.getProducer().getAccessKey());
}
if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) {
result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey());
}
else {
result.setSecretKey(rocketMQProperties.getProducer().getSecretKey());
}
if (rocketMQProperties.getProducer() == null || StringUtils
.isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) {
result.setCustomizedTraceTopic(
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
}
else {
result.setCustomizedTraceTopic(
rocketMQProperties.getProducer().getCustomizedTraceTopic());
}
if (rocketMQProperties.getProducer() != null
&& rocketMQProperties.getProducer().isEnableMsgTrace()) {
result.setEnableMsgTrace(Boolean.TRUE);
}
else {
result.setEnableMsgTrace(
rocketBinderConfigurationProperties.isEnableMsgTrace());
}
return result;
}
public static String getNameServerStr(List<String> nameServerList) {
if (CollectionUtils.isEmpty(nameServerList)) {
return null;
}
return String.join(";", nameServerList);
}
}

@ -1,11 +1,11 @@
/* /*
* Copyright 2013-2018 the original author or authors. * Copyright (C) 2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* https://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -16,34 +16,18 @@
package com.alibaba.cloud.stream.binder.rocketmq; package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.Collection; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import java.util.Collections; import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
import java.util.HashMap; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter;
import java.util.Map; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull.DefaultErrorAcknowledgeHandler;
import java.util.Set; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.integration.outbound.RocketMQProducerMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@ -55,15 +39,19 @@ import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer; import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
* A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the
* underlying middleware.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQMessageChannelBinder extends public class RocketMQMessageChannelBinder extends
@ -71,121 +59,45 @@ public class RocketMQMessageChannelBinder extends
implements implements
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties binderConfigurationProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final RocketMQProperties rocketMQProperties;
private final InstrumentationManager instrumentationManager; public RocketMQMessageChannelBinder(
RocketMQBinderConfigurationProperties binderConfigurationProperties,
private Map<String, String> topicInUse = new HashMap<>();
public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQTopicProvisioner provisioningProvider) {
RocketMQProperties rocketMQProperties, super(new String[0], provisioningProvider);
InstrumentationManager instrumentationManager) {
super(null, provisioningProvider);
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.binderConfigurationProperties = binderConfigurationProperties;
this.rocketMQProperties = rocketMQProperties;
this.instrumentationManager = instrumentationManager;
} }
@Override @Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination, protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties, ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception { MessageChannel channel, MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) { if (!extendedProducerProperties.getExtension().getEnabled()) {
throw new RuntimeException("Binding for channel " + destination.getName()
// if producerGroup is empty, using destination + " has been disabled, message can't be delivered");
String extendedProducerGroup = producerProperties.getExtension().getGroup();
String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
? destination.getName() : extendedProducerGroup;
RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils
.mergeProperties(rocketBinderConfigurationProperties,
rocketMQProperties);
RocketMQTemplate rocketMQTemplate;
if (producerProperties.getExtension().getTransactional()) {
Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory()
.getBeansOfType(RocketMQTemplate.class);
if (rocketMQTemplates.size() == 0) {
throw new IllegalStateException(
"there is no RocketMQTemplate in Spring BeanFactory");
}
else if (rocketMQTemplates.size() > 1) {
throw new IllegalStateException(
"there is more than 1 RocketMQTemplates in Spring BeanFactory");
}
rocketMQTemplate = rocketMQTemplates.values().iterator().next();
}
else {
rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setObjectMapper(this.getApplicationContext()
.getBeansOfType(ObjectMapper.class).values().iterator().next());
DefaultMQProducer producer;
String ak = mergedProperties.getAccessKey();
String sk = mergedProperties.getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ak, sk));
producer = new DefaultMQProducer(producerGroup, rpcHook,
mergedProperties.isEnableMsgTrace(),
mergedProperties.getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
destination.getName() + "|" + UtilAll.getPid()));
}
else {
producer = new DefaultMQProducer(producerGroup);
producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled());
}
producer.setNamesrvAddr(RocketMQBinderUtils
.getNameServerStr(mergedProperties.getNameServer()));
producer.setSendMsgTimeout(
producerProperties.getExtension().getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(
producerProperties.getExtension().getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerProperties
.getExtension().getRetryTimesWhenSendAsyncFailed());
producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension()
.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(
producerProperties.getExtension().isRetryNextServer());
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
if (producerProperties.isPartitioned()) {
rocketMQTemplate
.setMessageQueueSelector(new PartitionMessageQueueSelector());
}
} }
RocketMQProducerProperties mqProducerProperties = RocketMQUtils
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( .mergeRocketMQProperties(binderConfigurationProperties,
rocketMQTemplate, destination.getName(), producerGroup, extendedProducerProperties.getExtension());
producerProperties.getExtension().getTransactional(), RocketMQProducerMessageHandler messageHandler = new RocketMQProducerMessageHandler(
instrumentationManager, producerProperties, destination, extendedProducerProperties, mqProducerProperties);
((AbstractMessageChannel) channel).getInterceptors().stream().filter( messageHandler.setApplicationContext(this.getApplicationContext());
channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
.map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
.findFirst().orElse(null));
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync());
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
if (errorChannel != null) { if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel); messageHandler.setSendFailureChannel(errorChannel);
} }
MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor = ((AbstractMessageChannel) channel)
.getInterceptors().stream()
.filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor)
.map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor))
.findFirst().orElse(null);
messageHandler.setPartitioningInterceptor(partitioningInterceptor);
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setErrorMessageStrategy(this.getErrorMessageStrategy());
return messageHandler; return messageHandler;
} }
else {
throw new RuntimeException("Binding for channel " + destination.getName()
+ " has been disabled, message can't be delivered");
}
}
@Override @Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination, protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
@ -198,56 +110,43 @@ public class RocketMQMessageChannelBinder extends
@Override @Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, protected MessageProducer createConsumerEndpoint(ConsumerDestination destination,
String group, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties)
throws Exception { throws Exception {
if (group == null || "".equals(group)) { // todo support anymous consumer
if (StringUtils.isEmpty(group)) {
throw new RuntimeException( throw new RuntimeException(
"'group must be configured for channel " + destination.getName()); "'group must be configured for channel " + destination.getName());
} }
RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
extendedConsumerProperties.getExtension());
extendedConsumerProperties.getExtension().setGroup(group);
RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer( RocketMQInboundChannelAdapter inboundChannelAdapter = new RocketMQInboundChannelAdapter(
consumerProperties, rocketBinderConfigurationProperties, this); destination.getName(), extendedConsumerProperties);
listenerContainer.setConsumerGroup(group);
listenerContainer.setTopic(destination.getName());
listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
listenerContainer.setSuspendCurrentQueueTimeMillis(
consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());
listenerContainer.setDelayLevelWhenNextConsume(
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
listenerContainer, consumerProperties, instrumentationManager);
topicInUse.put(destination.getName(), group);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
group, consumerProperties); group, extendedConsumerProperties);
if (consumerProperties.getMaxAttempts() > 1) { if (extendedConsumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter inboundChannelAdapter
.setRetryTemplate(buildRetryTemplate(consumerProperties)); .setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
rocketInboundChannelAdapter inboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
.setRecoveryCallback(errorInfrastructure.getRecoverer());
} }
else { else {
rocketInboundChannelAdapter inboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
.setErrorChannel(errorInfrastructure.getErrorChannel());
} }
return inboundChannelAdapter;
return rocketInboundChannelAdapter;
} }
@Override @Override
protected PolledConsumerResources createPolledConsumerResources(String name, protected PolledConsumerResources createPolledConsumerResources(String name,
String group, ConsumerDestination destination, String group, ConsumerDestination destination,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) { ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
RocketMQMessageSource rocketMQMessageSource = new RocketMQMessageSource( RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
rocketBinderConfigurationProperties, consumerProperties, name, group); extendedConsumerProperties.getExtension());
return new PolledConsumerResources(rocketMQMessageSource, extendedConsumerProperties.getExtension().setGroup(group);
registerErrorInfrastructure(destination, group, consumerProperties, RocketMQMessageSource messageSource = new RocketMQMessageSource(name,
true)); extendedConsumerProperties);
return new PolledConsumerResources(messageSource, registerErrorInfrastructure(
destination, group, extendedConsumerProperties, true));
} }
@Override @Override
@ -261,67 +160,47 @@ public class RocketMQMessageChannelBinder extends
((MessagingException) message.getPayload()) ((MessagingException) message.getPayload())
.getFailedMessage()); .getFailedMessage());
if (ack != null) { if (ack != null) {
if (properties.getExtension().shouldRequeue()) { ErrorAcknowledgeHandler handler = RocketMQBeanContainerCache.getBean(
ack.acknowledge(Status.REQUEUE); properties.getExtension().getPull().getErrAcknowledge(),
} ErrorAcknowledgeHandler.class,
else { new DefaultErrorAcknowledgeHandler());
ack.acknowledge(Status.REJECT); ack.acknowledge(
} handler.handler(((MessagingException) message.getPayload())
.getFailedMessage()));
} }
} }
}; };
} }
/**
* Binders can return an {@link ErrorMessageStrategy} for building error messages;
* binder implementations typically might add extra headers to the error message.
*
* @return the implementation - may be null.
*/
@Override @Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { protected ErrorMessageStrategy getErrorMessageStrategy() {
return extendedBindingProperties.getExtendedConsumerProperties(channelName); // It can be extended to custom if necessary.
return new DefaultErrorMessageStrategy();
} }
@Override @Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName); return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
} }
public Map<String, String> getTopicInUse() { @Override
return topicInUse; public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
} }
@Override @Override
public String getDefaultsPrefix() { public String getDefaultsPrefix() {
return extendedBindingProperties.getDefaultsPrefix(); return this.extendedBindingProperties.getDefaultsPrefix();
} }
@Override @Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() { public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return extendedBindingProperties.getExtendedPropertiesEntryClass(); return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
public void setExtendedBindingProperties(
RocketMQExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties;
}
private RocketMQHeaderMapper createHeaderMapper(
final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
Set<String> trustedPackages = extendedConsumerProperties.getExtension()
.getTrustedPackages();
return createHeaderMapper(trustedPackages);
}
private RocketMQHeaderMapper createHeaderMapper(
final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
return createHeaderMapper(Collections.emptyList());
} }
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages) {
ObjectMapper objectMapper = this.getApplicationContext()
.getBeansOfType(ObjectMapper.class).values().iterator().next();
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(
objectMapper);
if (!StringUtils.isEmpty(trustedPackages)) {
headerMapper.addTrustedPackages(trustedPackages);
}
return headerMapper;
}
} }

@ -19,7 +19,6 @@ package com.alibaba.cloud.stream.binder.rocketmq.actuator;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation; import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
@ -29,23 +28,20 @@ import org.springframework.boot.actuate.health.Health;
*/ */
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
@Autowired
private InstrumentationManager instrumentationManager;
@Override @Override
protected void doHealthCheck(Health.Builder builder) throws Exception { protected void doHealthCheck(Health.Builder builder) throws Exception {
if (instrumentationManager.getHealthInstrumentations().stream() if (InstrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isUp)) { .allMatch(Instrumentation::isUp)) {
builder.up(); builder.up();
return; return;
} }
if (instrumentationManager.getHealthInstrumentations().stream() if (InstrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isOutOfService)) { .allMatch(Instrumentation::isOutOfService)) {
builder.outOfService(); builder.outOfService();
return; return;
} }
builder.down(); builder.down();
instrumentationManager.getHealthInstrumentations().stream() InstrumentationManager.getHealthInstrumentations().stream()
.filter(instrumentation -> !instrumentation.isStarted()) .filter(instrumentation -> !instrumentation.isStarted())
.forEach(instrumentation1 -> builder .forEach(instrumentation1 -> builder
.withException(instrumentation1.getStartException())); .withException(instrumentation1.getStartException()));

@ -1,465 +1,470 @@
/* /// *
* Copyright 2013-2018 the original author or authors. // * Copyright 2013-2018 the original author or authors.
* // *
* Licensed under the Apache License, Version 2.0 (the "License"); // * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. // * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at // * You may obtain a copy of the License at
* // *
* https://www.apache.org/licenses/LICENSE-2.0 // * https://www.apache.org/licenses/LICENSE-2.0
* // *
* Unless required by applicable law or agreed to in writing, software // * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, // * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and // * See the License for the specific language governing permissions and
* limitations under the License. // * limitations under the License.
*/ // */
//
package com.alibaba.cloud.stream.binder.rocketmq.consuming; // package com.alibaba.cloud.stream.binder.rocketmq.consuming;
//
import java.util.List; // import java.util.List;
import java.util.Objects; // import java.util.Objects;
//
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils; // import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; // import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; // import
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; /// com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; // import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.acl.common.AclClientRPCHook; // import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import org.apache.rocketmq.acl.common.SessionCredentials; // import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; // import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.MessageSelector; // import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; // import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; // import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; // import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; // import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; // import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; // import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; // import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException; // import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.UtilAll; // import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; // import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook; // import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode; // import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.MessageModel; // import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.SelectorType; // import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.core.RocketMQListener; // import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; // import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.support.RocketMQListenerContainer; // import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.support.RocketMQUtil; // import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
import org.slf4j.Logger; // import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.LoggerFactory; // import org.slf4j.Logger;
// import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; //
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; // import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle; // import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.support.MessageBuilder; // import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message; // import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert; // import org.springframework.messaging.Message;
import org.springframework.util.StringUtils; // import org.springframework.util.Assert;
// import org.springframework.util.StringUtils;
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES; //
// import static
/** /// com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
* A class that Listen on rocketmq message. //
* <p> /// **
* this class will delegate {@link RocketMQListener} to handle message // * A class that Listen on rocketmq message.
* // * <p>
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> // * this class will delegate {@link RocketMQListener} to handle message
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a> // *
* @see RocketMQListener // * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ // * @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
public class RocketMQListenerBindingContainer // * @see RocketMQListener
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { // */
// public class RocketMQListenerBindingContainer
private final static Logger log = LoggerFactory // implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
.getLogger(RocketMQListenerBindingContainer.class); //
// private final static Logger log = LoggerFactory
private long suspendCurrentQueueTimeMillis = 1000; // .getLogger(RocketMQListenerBindingContainer.class);
//
/** // private long suspendCurrentQueueTimeMillis = 1000;
* Message consume retry strategy<br> //
* -1,no retry,put into DLQ directly<br> // /**
* 0,broker control retry frequency<br> // * Message consume retry strategy<br>
* >0,client control retry frequency. // * -1,no retry,put into DLQ directly<br>
*/ // * 0,broker control retry frequency<br>
private int delayLevelWhenNextConsume = 0; // * >0,client control retry frequency.
// */
private List<String> nameServer; // private int delayLevelWhenNextConsume = 0;
//
private String consumerGroup; // private List<String> nameServer;
//
private String topic; // private String consumerGroup;
//
private int consumeThreadMax = 64; // private String topic;
//
private String charset = "UTF-8"; // private int consumeThreadMax = 64;
//
private RocketMQListener rocketMQListener; // private String charset = "UTF-8";
//
private RocketMQHeaderMapper headerMapper; // private RocketMQListener rocketMQListener;
//
private DefaultMQPushConsumer consumer; // private RocketMQHeaderMapper headerMapper;
//
private boolean running; // private DefaultMQPushConsumer consumer;
//
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties; // private boolean running;
//
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; // private final ExtendedConsumerProperties<RocketMQConsumerProperties>
/// rocketMQConsumerProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; //
// private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
// The following properties came from RocketMQConsumerProperties. //
private ConsumeMode consumeMode; // private final RocketMQBinderConfigurationProperties
/// rocketBinderConfigurationProperties;
private SelectorType selectorType; //
// // The following properties came from RocketMQConsumerProperties.
private String selectorExpression; // private ConsumeMode consumeMode;
//
private MessageModel messageModel; // private SelectorType selectorType;
//
public RocketMQListenerBindingContainer( // private String selectorExpression;
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, //
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, // private MessageModel messageModel;
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { //
this.rocketMQConsumerProperties = rocketMQConsumerProperties; // public RocketMQListenerBindingContainer(
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; // ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; // RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() // RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY; // this.rocketMQConsumerProperties = rocketMQConsumerProperties;
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) { // this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.selectorType = SelectorType.TAG; // this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags(); // this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
} // ? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
else { // if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
this.selectorType = SelectorType.SQL92; // this.selectorType = SelectorType.TAG;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql(); // this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
} // }
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting() // else {
? MessageModel.BROADCASTING : MessageModel.CLUSTERING; // this.selectorType = SelectorType.SQL92;
} // this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
// }
@Override // this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
public void setupMessageListener(RocketMQListener<?> rocketMQListener) { // ? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
this.rocketMQListener = rocketMQListener; // }
} //
// @Override
@Override // public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
public void destroy() throws Exception { // this.rocketMQListener = rocketMQListener;
this.setRunning(false); // }
if (Objects.nonNull(consumer)) { //
consumer.shutdown(); // @Override
} // public void destroy() throws Exception {
log.info("container destroyed, {}", this.toString()); // this.setRunning(false);
} // if (Objects.nonNull(consumer)) {
// consumer.shutdown();
@Override // }
public void afterPropertiesSet() throws Exception { // log.info("container destroyed, {}", this.toString());
initRocketMQPushConsumer(); // }
} //
// @Override
@Override // public void afterPropertiesSet() throws Exception {
public boolean isAutoStartup() { // initRocketMQPushConsumer();
return true; // }
} //
// @Override
@Override // public boolean isAutoStartup() {
public void stop(Runnable callback) { // return true;
stop(); // }
callback.run(); //
} // @Override
// public void stop(Runnable callback) {
@Override // stop();
public void start() { // callback.run();
if (this.isRunning()) { // }
throw new IllegalStateException( //
"container already running. " + this.toString()); // @Override
} // public void start() {
// if (this.isRunning()) {
try { // throw new IllegalStateException(
consumer.start(); // "container already running. " + this.toString());
} // }
catch (MQClientException e) { //
throw new IllegalStateException("Failed to start RocketMQ push consumer", e); // try {
} // consumer.start();
this.setRunning(true); // }
// catch (MQClientException e) {
log.info("running container: {}", this.toString()); // throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
} // }
// this.setRunning(true);
@Override //
public void stop() { // log.info("running container: {}", this.toString());
if (this.isRunning()) { // }
if (Objects.nonNull(consumer)) { //
consumer.shutdown(); // @Override
} // public void stop() {
setRunning(false); // if (this.isRunning()) {
} // if (Objects.nonNull(consumer)) {
} // consumer.shutdown();
// }
@Override // setRunning(false);
public boolean isRunning() { // }
return running; // }
} //
// @Override
private void setRunning(boolean running) { // public boolean isRunning() {
this.running = running; // return running;
} // }
//
@Override // private void setRunning(boolean running) {
public int getPhase() { // this.running = running;
return Integer.MAX_VALUE; // }
} //
// @Override
private void initRocketMQPushConsumer() throws MQClientException { // public int getPhase() {
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); // return Integer.MAX_VALUE;
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); // }
Assert.notNull(nameServer, "Property 'nameServer' is required"); //
Assert.notNull(topic, "Property 'topic' is required"); // private void initRocketMQPushConsumer() throws MQClientException {
// Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
String ak = rocketBinderConfigurationProperties.getAccessKey(); // Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
String sk = rocketBinderConfigurationProperties.getSecretKey(); // Assert.notNull(nameServer, "Property 'nameServer' is required");
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { // Assert.notNull(topic, "Property 'topic' is required");
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk)); //
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, // String ak = rocketBinderConfigurationProperties.getAccessKey();
new AllocateMessageQueueAveragely(), // String sk = rocketBinderConfigurationProperties.getSecretKey();
rocketBinderConfigurationProperties.isEnableMsgTrace(), // if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); // RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, // consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
topic + "|" + UtilAll.getPid())); // new AllocateMessageQueueAveragely(),
consumer.setVipChannelEnabled(false); // rocketBinderConfigurationProperties.isEnableMsgTrace(),
} // rocketBinderConfigurationProperties.getCustomizedTraceTopic());
else { // consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
consumer = new DefaultMQPushConsumer(consumerGroup, // topic + "|" + UtilAll.getPid()));
rocketBinderConfigurationProperties.isEnableMsgTrace(), // consumer.setVipChannelEnabled(false);
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); // }
} // else {
// consumer = new DefaultMQPushConsumer(consumerGroup,
consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer)); // rocketBinderConfigurationProperties.isEnableMsgTrace(),
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); // rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); // }
//
switch (messageModel) { // consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
case BROADCASTING: // consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setMessageModel( // consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); //
break; // switch (messageModel) {
case CLUSTERING: // case BROADCASTING:
consumer.setMessageModel( // consumer.setMessageModel(
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); // org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break; // break;
default: // case CLUSTERING:
throw new IllegalArgumentException("Property 'messageModel' was wrong."); // consumer.setMessageModel(
} // org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
// break;
switch (selectorType) { // default:
case TAG: // throw new IllegalArgumentException("Property 'messageModel' was wrong.");
consumer.subscribe(topic, selectorExpression); // }
break; //
case SQL92: // switch (selectorType) {
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); // case TAG:
break; // consumer.subscribe(topic, selectorExpression);
default: // break;
throw new IllegalArgumentException("Property 'selectorType' was wrong."); // case SQL92:
} // consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
// break;
switch (consumeMode) { // default:
case ORDERLY: // throw new IllegalArgumentException("Property 'selectorType' was wrong.");
consumer.setMessageListener(new DefaultMessageListenerOrderly()); // }
break; //
case CONCURRENTLY: // switch (consumeMode) {
consumer.setMessageListener(new DefaultMessageListenerConcurrently()); // case ORDERLY:
break; // consumer.setMessageListener(new DefaultMessageListenerOrderly());
default: // break;
throw new IllegalArgumentException("Property 'consumeMode' was wrong."); // case CONCURRENTLY:
} // consumer.setMessageListener(new DefaultMessageListenerConcurrently());
// break;
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { // default:
((RocketMQPushConsumerLifecycleListener) rocketMQListener) // throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
.prepareStart(consumer); // }
} //
// if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
} // ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
// .prepareStart(consumer);
@Override // }
public String toString() { //
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup // }
+ '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' //
+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType // @Override
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" // public String toString() {
+ messageModel + '}'; // return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
} // + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
// + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
public long getSuspendCurrentQueueTimeMillis() { // + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
return suspendCurrentQueueTimeMillis; // + messageModel + '}';
} // }
//
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { // public long getSuspendCurrentQueueTimeMillis() {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; // return suspendCurrentQueueTimeMillis;
} // }
//
public int getDelayLevelWhenNextConsume() { // public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
return delayLevelWhenNextConsume; // this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
} // }
//
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { // public int getDelayLevelWhenNextConsume() {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; // return delayLevelWhenNextConsume;
} // }
//
public List<String> getNameServer() { // public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
return nameServer; // this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
} // }
//
public void setNameServer(List<String> nameServer) { // public List<String> getNameServer() {
this.nameServer = nameServer; // return nameServer;
} // }
//
public String getConsumerGroup() { // public void setNameServer(List<String> nameServer) {
return consumerGroup; // this.nameServer = nameServer;
} // }
//
public void setConsumerGroup(String consumerGroup) { // public String getConsumerGroup() {
this.consumerGroup = consumerGroup; // return consumerGroup;
} // }
//
public String getTopic() { // public void setConsumerGroup(String consumerGroup) {
return topic; // this.consumerGroup = consumerGroup;
} // }
//
public void setTopic(String topic) { // public String getTopic() {
this.topic = topic; // return topic;
} // }
//
public int getConsumeThreadMax() { // public void setTopic(String topic) {
return consumeThreadMax; // this.topic = topic;
} // }
//
public void setConsumeThreadMax(int consumeThreadMax) { // public int getConsumeThreadMax() {
this.consumeThreadMax = consumeThreadMax; // return consumeThreadMax;
} // }
//
public String getCharset() { // public void setConsumeThreadMax(int consumeThreadMax) {
return charset; // this.consumeThreadMax = consumeThreadMax;
} // }
//
public void setCharset(String charset) { // public String getCharset() {
this.charset = charset; // return charset;
} // }
//
public RocketMQListener getRocketMQListener() { // public void setCharset(String charset) {
return rocketMQListener; // this.charset = charset;
} // }
//
public void setRocketMQListener(RocketMQListener rocketMQListener) { // public RocketMQListener getRocketMQListener() {
this.rocketMQListener = rocketMQListener; // return rocketMQListener;
} // }
//
public DefaultMQPushConsumer getConsumer() { // public void setRocketMQListener(RocketMQListener rocketMQListener) {
return consumer; // this.rocketMQListener = rocketMQListener;
} // }
//
public void setConsumer(DefaultMQPushConsumer consumer) { // public DefaultMQPushConsumer getConsumer() {
this.consumer = consumer; // return consumer;
} // }
//
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() { // public void setConsumer(DefaultMQPushConsumer consumer) {
return rocketMQConsumerProperties; // this.consumer = consumer;
} // }
//
public ConsumeMode getConsumeMode() { // public ExtendedConsumerProperties<RocketMQConsumerProperties>
return consumeMode; /// getRocketMQConsumerProperties() {
} // return rocketMQConsumerProperties;
// }
public SelectorType getSelectorType() { //
return selectorType; // public ConsumeMode getConsumeMode() {
} // return consumeMode;
// }
public String getSelectorExpression() { //
return selectorExpression; // public SelectorType getSelectorType() {
} // return selectorType;
// }
public MessageModel getMessageModel() { //
return messageModel; // public String getSelectorExpression() {
} // return selectorExpression;
// }
public RocketMQHeaderMapper getHeaderMapper() { //
return headerMapper; // public MessageModel getMessageModel() {
} // return messageModel;
// }
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) { //
this.headerMapper = headerMapper; // public RocketMQHeaderMapper getHeaderMapper() {
} // return headerMapper;
// }
/** //
* Convert rocketmq {@link MessageExt} to Spring {@link Message}. // public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
* @param messageExt the rocketmq message // this.headerMapper = headerMapper;
* @return the converted Spring {@link Message} // }
*/ //
@SuppressWarnings("unchecked") // /**
private Message convertToSpringMessage(MessageExt messageExt) { // * Convert rocketmq {@link MessageExt} to Spring {@link Message}.
// * @param messageExt the rocketmq message
// add reconsume-times header to messageExt // * @return the converted Spring {@link Message}
int reconsumeTimes = messageExt.getReconsumeTimes(); // */
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, // @SuppressWarnings("unchecked")
String.valueOf(reconsumeTimes)); // private Message convertToSpringMessage(MessageExt messageExt) {
Message message = RocketMQUtil.convertToSpringMessage(messageExt); //
return MessageBuilder.fromMessage(message) // // add reconsume-times header to messageExt
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); // int reconsumeTimes = messageExt.getReconsumeTimes();
} // messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
// String.valueOf(reconsumeTimes));
public class DefaultMessageListenerConcurrently // Message message = RocketMQUtil.convertToSpringMessage(messageExt);
implements MessageListenerConcurrently { // return MessageBuilder.fromMessage(message)
// .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
@SuppressWarnings({ "unchecked", "Duplicates" }) // }
@Override //
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, // public class DefaultMessageListenerConcurrently
ConsumeConcurrentlyContext context) { // implements MessageListenerConcurrently {
for (MessageExt messageExt : msgs) { //
log.debug("received msg: {}", messageExt); // @SuppressWarnings({ "unchecked", "Duplicates" })
try { // @Override
long now = System.currentTimeMillis(); // public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
rocketMQListener.onMessage(convertToSpringMessage(messageExt)); // ConsumeConcurrentlyContext context) {
long costTime = System.currentTimeMillis() - now; // for (MessageExt messageExt : msgs) {
log.debug("consume {} message key:[{}] cost: {} ms", // log.debug("received msg: {}", messageExt);
messageExt.getMsgId(), messageExt.getKeys(), costTime); // try {
} // long now = System.currentTimeMillis();
catch (Exception e) { // rocketMQListener.onMessage(convertToSpringMessage(messageExt));
log.warn("consume message failed. messageExt:{}", messageExt, e); // long costTime = System.currentTimeMillis() - now;
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); // log.debug("consume {} message key:[{}] cost: {} ms",
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // messageExt.getMsgId(), messageExt.getKeys(), costTime);
} // }
} // catch (Exception e) {
// log.warn("consume message failed. messageExt:{}", messageExt, e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
} // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// }
} // }
//
public class DefaultMessageListenerOrderly implements MessageListenerOrderly { // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
@SuppressWarnings({ "unchecked", "Duplicates" }) //
@Override // }
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, //
ConsumeOrderlyContext context) { // public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
for (MessageExt messageExt : msgs) { //
log.debug("received msg: {}", messageExt); // @SuppressWarnings({ "unchecked", "Duplicates" })
try { // @Override
long now = System.currentTimeMillis(); // public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
rocketMQListener.onMessage(convertToSpringMessage(messageExt)); // ConsumeOrderlyContext context) {
long costTime = System.currentTimeMillis() - now; // for (MessageExt messageExt : msgs) {
log.info("consume {} message key:[{}] cost: {} ms", // log.debug("received msg: {}", messageExt);
messageExt.getMsgId(), messageExt.getKeys(), costTime); // try {
} // long now = System.currentTimeMillis();
catch (Exception e) { // rocketMQListener.onMessage(convertToSpringMessage(messageExt));
log.warn("consume message failed. messageExt:{}", messageExt, e); // long costTime = System.currentTimeMillis() - now;
context.setSuspendCurrentQueueTimeMillis( // log.info("consume {} message key:[{}] cost: {} ms",
suspendCurrentQueueTimeMillis); // messageExt.getMsgId(), messageExt.getKeys(), costTime);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; // }
} // catch (Exception e) {
} // log.warn("consume message failed. messageExt:{}", messageExt, e);
// context.setSuspendCurrentQueueTimeMillis(
return ConsumeOrderlyStatus.SUCCESS; // suspendCurrentQueueTimeMillis);
} // return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
// }
} // }
//
} // return ConsumeOrderlyStatus.SUCCESS;
// }
//
// }
//
// }

@ -1,176 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private static final Logger log = LoggerFactory
.getLogger(RocketMQInboundChannelAdapter.class);
private RetryTemplate retryTemplate;
private RecoveryCallback<? extends Object> recoveryCallback;
private RocketMQListenerBindingContainer rocketMQListenerContainer;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final InstrumentationManager instrumentationManager;
public RocketMQInboundChannelAdapter(
RocketMQListenerBindingContainer rocketMQListenerContainer,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
InstrumentationManager instrumentationManager) {
this.rocketMQListenerContainer = rocketMQListenerContainer;
this.consumerProperties = consumerProperties;
this.instrumentationManager = instrumentationManager;
}
@Override
protected void onInit() {
if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return;
}
super.onInit();
if (this.retryTemplate != null) {
Assert.state(getErrorChannel() == null,
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
}
BindingRocketMQListener listener = new BindingRocketMQListener();
rocketMQListenerContainer.setRocketMQListener(listener);
if (retryTemplate != null) {
this.retryTemplate.registerListener(listener);
}
try {
rocketMQListenerContainer.afterPropertiesSet();
}
catch (Exception e) {
log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
throw new IllegalArgumentException(
"rocketMQListenerContainer init error: " + e.getMessage(), e);
}
instrumentationManager.addHealthInstrumentation(
new Instrumentation(rocketMQListenerContainer.getTopic()
+ rocketMQListenerContainer.getConsumerGroup()));
}
@Override
protected void doStart() {
if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return;
}
try {
rocketMQListenerContainer.start();
instrumentationManager
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
+ rocketMQListenerContainer.getConsumerGroup())
.markStartedSuccessfully();
}
catch (Exception e) {
instrumentationManager
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
+ rocketMQListenerContainer.getConsumerGroup())
.markStartFailed(e);
log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
.build(), e);
}
}
@Override
protected void doStop() {
rocketMQListenerContainer.stop();
}
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
protected class BindingRocketMQListener
implements RocketMQListener<Message>, RetryListener {
@Override
public void onMessage(Message message) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
if (enableRetry) {
RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
RocketMQInboundChannelAdapter.this.sendMessage(message);
return null;
}, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback);
}
else {
RocketMQInboundChannelAdapter.this.sendMessage(message);
}
}
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
}
}
}

@ -1,302 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageHandler.class);
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private MessageChannel sendFailureChannel;
private final RocketMQTemplate rocketMQTemplate;
private RocketMQHeaderMapper headerMapper;
private final Boolean transactional;
private final String destination;
private final String groupName;
private final InstrumentationManager instrumentationManager;
private boolean sync = false;
private volatile boolean running = false;
private ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
String groupName, Boolean transactional,
InstrumentationManager instrumentationManager,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
this.rocketMQTemplate = rocketMQTemplate;
this.destination = destination;
this.groupName = groupName;
this.transactional = transactional;
this.instrumentationManager = instrumentationManager;
this.producerProperties = producerProperties;
this.partitioningInterceptor = partitioningInterceptor;
}
@Override
public void start() {
if (!transactional) {
instrumentationManager
.addHealthInstrumentation(new Instrumentation(destination));
try {
rocketMQTemplate.afterPropertiesSet();
instrumentationManager.getHealthInstrumentation(destination)
.markStartedSuccessfully();
}
catch (Exception e) {
instrumentationManager.getHealthInstrumentation(destination)
.markStartFailed(e);
log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
.build(), e);
}
}
if (producerProperties.isPartitioned()) {
try {
List<MessageQueue> messageQueues = rocketMQTemplate.getProducer()
.fetchPublishMessageQueues(destination);
if (producerProperties.getPartitionCount() != messageQueues.size()) {
logger.info(String.format(
"The partition count of topic '%s' will change from '%s' to '%s'",
destination, producerProperties.getPartitionCount(),
messageQueues.size()));
producerProperties.setPartitionCount(messageQueues.size());
partitioningInterceptor
.setPartitionCount(producerProperties.getPartitionCount());
}
}
catch (MQClientException e) {
logger.error("fetch publish message queues fail", e);
}
}
running = true;
}
@Override
public void stop() {
if (!transactional) {
rocketMQTemplate.destroy();
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
protected void handleMessageInternal(
org.springframework.messaging.Message<?> message) {
try {
// issue 737 fix
Map<String, String> jsonHeaders = headerMapper
.fromHeaders(message.getHeaders());
message = org.springframework.messaging.support.MessageBuilder
.fromMessage(message).copyHeaders(jsonHeaders).build();
final StringBuilder topicWithTags = new StringBuilder(destination);
String tags = Optional
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
.toString();
if (!StringUtils.isEmpty(tags)) {
topicWithTags.append(":").append(tags);
}
SendResult sendRes = null;
if (transactional) {
sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
topicWithTags.toString(), message, message.getHeaders()
.get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
}
else {
int delayLevel = 0;
try {
Object delayLevelObj = message.getHeaders()
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
if (delayLevelObj instanceof Number) {
delayLevel = ((Number) delayLevelObj).intValue();
}
else if (delayLevelObj instanceof String) {
delayLevel = Integer.parseInt((String) delayLevelObj);
}
}
catch (Exception e) {
// ignore
}
boolean needSelectQueue = message.getHeaders()
.containsKey(BinderHeaders.PARTITION_HEADER);
if (sync) {
if (needSelectQueue) {
sendRes = rocketMQTemplate.syncSendOrderly(
topicWithTags.toString(), message, "",
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
else {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
}
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
}
else {
Message<?> finalMessage = message;
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
}
@Override
public void onException(Throwable e) {
log.error("RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(new MessagingException(
finalMessage, e), null));
}
}
};
if (needSelectQueue) {
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
message, "", sendCallback,
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
sendCallback);
}
}
}
if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
if (getSendFailureChannel() != null) {
this.getSendFailureChannel().send(message);
}
else {
throw new MessagingException(message,
new MQClientException("message hasn't been sent", null));
}
}
}
catch (Exception e) {
log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy
.buildErrorMessage(new MessagingException(message, e), null));
}
else {
throw new MessagingException(message, e);
}
}
}
/**
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link MessagingException} with the failed
* message and cause.
* @param sendFailureChannel the failure channel.
* @since 0.2.2
*/
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
this.sendFailureChannel = sendFailureChannel;
}
/**
* Set the error message strategy implementation to use when sending error messages
* after send failures. Cannot be null.
* @param errorMessageStrategy the implementation.
* @since 0.2.2
*/
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
this.errorMessageStrategy = errorMessageStrategy;
}
public MessageChannel getSendFailureChannel() {
return sendFailureChannel;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public RocketMQHeaderMapper getHeaderMapper() {
return headerMapper;
}
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}
}

@ -1,382 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Set;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageSource extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageSource.class);
private final RocketMQCallbackFactory ackCallbackFactory;
private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final String topic;
private final String group;
private final Object consumerMonitor = new Object();
private DefaultMQPullConsumer consumer;
private boolean running;
private MessageSelector messageSelector;
private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser();
public RocketMQMessageSource(
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
String topic, String group) {
this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties,
rocketMQConsumerProperties, topic, group);
}
public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory,
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
String topic, String group) {
this.ackCallbackFactory = ackCallbackFactory;
this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.topic = topic;
this.group = group;
}
@Override
public synchronized void start() {
if (this.isRunning()) {
throw new IllegalStateException(
"pull consumer already running. " + this.toString());
}
try {
consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(
rocketMQBinderConfigurationProperties.getNameServer()));
consumer.setConsumerPullTimeoutMillis(
rocketMQConsumerProperties.getExtension().getPullTimeout());
consumer.setMessageModel(MessageModel.CLUSTERING);
String tags = rocketMQConsumerProperties.getExtension().getTags();
String sql = rocketMQConsumerProperties.getExtension().getSql();
if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) {
messageSelector = MessageSelector.byTag(tags);
}
else if (!StringUtils.isEmpty(tags)) {
messageSelector = MessageSelector.byTag(tags);
}
else if (!StringUtils.isEmpty(sql)) {
messageSelector = MessageSelector.bySql(sql);
}
consumer.registerMessageQueueListener(topic, new MessageQueueListener() {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
log.info(
"messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`",
topic, mqAll, mqDivided);
switch (consumer.getMessageModel()) {
case BROADCASTING:
RocketMQMessageSource.this.resetMessageQueues(mqAll);
break;
case CLUSTERING:
RocketMQMessageSource.this.resetMessageQueues(mqDivided);
break;
default:
break;
}
}
});
consumer.start();
}
catch (MQClientException e) {
log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e);
}
this.setRunning(true);
}
@Override
public synchronized void stop() {
if (this.isRunning()) {
this.setRunning(false);
consumer.shutdown();
}
}
@Override
public synchronized boolean isRunning() {
return running;
}
@Override
protected synchronized Object doReceive() {
if (messageQueueChooser.getMessageQueues() == null
|| messageQueueChooser.getMessageQueues().size() == 0) {
return null;
}
try {
int count = 0;
while (count < messageQueueChooser.getMessageQueues().size()) {
MessageQueue messageQueue;
synchronized (this.consumerMonitor) {
messageQueue = messageQueueChooser.choose();
messageQueueChooser.increment();
}
long offset = consumer.fetchConsumeOffset(messageQueue,
rocketMQConsumerProperties.getExtension().isFromStore());
log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'",
this.topic, this.group, messageQueue, offset);
PullResult pullResult;
if (messageSelector != null) {
pullResult = consumer.pull(messageQueue, messageSelector, offset, 1);
}
else {
pullResult = consumer.pull(messageQueue, (String) null, offset, 1);
}
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
Message message = RocketMQUtil
.convertToSpringMessage(messageExtList.get(0));
AcknowledgmentCallback ackCallback = this.ackCallbackFactory
.createCallback(new RocketMQAckInfo(messageQueue, pullResult,
consumer, offset));
Message messageResult = MessageBuilder.fromMessage(message).setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
ackCallback).build();
return messageResult;
}
else {
log.debug("messageQueue='{}' PullResult='{}' with topic `{}`",
messageQueueChooser.getMessageQueues(),
pullResult.getPullStatus(), topic);
}
count++;
}
}
catch (Exception e) {
log.error("Consumer pull error: " + e.getMessage(), e);
}
return null;
}
@Override
public String getComponentType() {
return "rocketmq:message-source";
}
public synchronized void setRunning(boolean running) {
this.running = running;
}
public synchronized void resetMessageQueues(Set<MessageQueue> queueSet) {
log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, queueSet);
synchronized (this.consumerMonitor) {
this.messageQueueChooser.reset(queueSet);
}
}
public static class RocketMQCallbackFactory
implements AcknowledgmentCallbackFactory<RocketMQAckInfo> {
@Override
public AcknowledgmentCallback createCallback(RocketMQAckInfo info) {
return new RocketMQAckCallback(info);
}
}
public static class RocketMQAckCallback implements AcknowledgmentCallback {
private final RocketMQAckInfo ackInfo;
private boolean acknowledged;
private boolean autoAckEnabled = true;
public RocketMQAckCallback(RocketMQAckInfo ackInfo) {
this.ackInfo = ackInfo;
}
protected void setAcknowledged(boolean acknowledged) {
this.acknowledged = acknowledged;
}
@Override
public boolean isAcknowledged() {
return this.acknowledged;
}
@Override
public void noAutoAck() {
this.autoAckEnabled = false;
}
@Override
public boolean isAutoAck() {
return this.autoAckEnabled;
}
@Override
public void acknowledge(Status status) {
Assert.notNull(status, "'status' cannot be null");
if (this.acknowledged) {
throw new IllegalStateException("Already acknowledged");
}
log.debug("acknowledge(" + status.name() + ") for " + this);
synchronized (this.ackInfo.getConsumerMonitor()) {
try {
switch (status) {
case ACCEPT:
case REJECT:
ackInfo.getConsumer().updateConsumeOffset(
ackInfo.getMessageQueue(),
ackInfo.getPullResult().getNextBeginOffset());
log.debug("messageQueue='{}' offset update to `{}`",
ackInfo.getMessageQueue(), String.valueOf(
ackInfo.getPullResult().getNextBeginOffset()));
break;
case REQUEUE:
// decrease index and update offset of messageQueue of ackInfo
int oldIndex = ackInfo.getMessageQueueChooser().requeue();
ackInfo.getConsumer().updateConsumeOffset(
ackInfo.getMessageQueue(), ackInfo.getOldOffset());
log.debug(
"messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'",
ackInfo.getMessageQueue(), oldIndex,
ackInfo.getOldOffset());
break;
default:
break;
}
}
catch (MQClientException e) {
log.error("acknowledge error: " + e.getErrorMessage(), e);
}
finally {
this.acknowledged = true;
}
}
}
@Override
public String toString() {
return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged="
+ acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}';
}
}
public class RocketMQAckInfo {
private final MessageQueue messageQueue;
private final PullResult pullResult;
private final DefaultMQPullConsumer consumer;
private final long oldOffset;
public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult,
DefaultMQPullConsumer consumer, long oldOffset) {
this.messageQueue = messageQueue;
this.pullResult = pullResult;
this.consumer = consumer;
this.oldOffset = oldOffset;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public PullResult getPullResult() {
return pullResult;
}
public DefaultMQPullConsumer getConsumer() {
return consumer;
}
public RocketMQMessageQueueChooser getMessageQueueChooser() {
return RocketMQMessageSource.this.messageQueueChooser;
}
public long getOldOffset() {
return oldOffset;
}
public Object getConsumerMonitor() {
return RocketMQMessageSource.this.consumerMonitor;
}
@Override
public String toString() {
return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult="
+ pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset
+ '}';
}
}
}

@ -16,8 +16,11 @@
package com.alibaba.cloud.stream.binder.rocketmq.metrics; package com.alibaba.cloud.stream.binder.rocketmq.metrics;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.context.Lifecycle;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -25,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class Instrumentation { public class Instrumentation {
private final String name; private final String name;
private Lifecycle actuator;
protected final AtomicBoolean started = new AtomicBoolean(false); protected final AtomicBoolean started = new AtomicBoolean(false);
@ -34,6 +38,19 @@ public class Instrumentation {
this.name = name; this.name = name;
} }
public Instrumentation(String name, Lifecycle actuator) {
this.name = name;
this.actuator = actuator;
}
public Lifecycle getActuator() {
return actuator;
}
public void setActuator(Lifecycle actuator) {
this.actuator = actuator;
}
public boolean isDown() { public boolean isDown() {
return startException != null; return startException != null;
} }
@ -67,4 +84,8 @@ public class Instrumentation {
return startException; return startException;
} }
@Override
public int hashCode() {
return Objects.hash(getName(), getActuator());
}
} }

@ -16,37 +16,34 @@
package com.alibaba.cloud.stream.binder.rocketmq.metrics; package com.alibaba.cloud.stream.binder.rocketmq.metrics;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class InstrumentationManager { public final class InstrumentationManager {
private final Map<String, Object> runtime = new ConcurrentHashMap<>(); private static final Map<Integer, Instrumentation> HEALTH_INSTRUMENTATIONS = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>(); public static Collection<Instrumentation> getHealthInstrumentations() {
return HEALTH_INSTRUMENTATIONS.values();
public Set<Instrumentation> getHealthInstrumentations() {
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue)
.collect(Collectors.toSet());
} }
public void addHealthInstrumentation(Instrumentation instrumentation) { public static void addHealthInstrumentation(Instrumentation instrumentation) {
healthInstrumentations.put(instrumentation.getName(), instrumentation); if (null != instrumentation) {
HEALTH_INSTRUMENTATIONS.computeIfPresent(instrumentation.hashCode(),
(k, v) -> {
if (instrumentation.getActuator() != null) {
instrumentation.getActuator().stop();
} }
throw new IllegalArgumentException(
public Instrumentation getHealthInstrumentation(String key) { "The current actuator exists, please confirm if there is a repeat operation!!!");
return healthInstrumentations.get(key); });
} }
public Map<String, Object> getRuntime() {
return runtime;
} }
} }

@ -1,11 +1,11 @@
/* /*
* Copyright 2013-2018 the original author or authors. * Copyright (C) 2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* https://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -16,86 +16,13 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties; package com.alibaba.cloud.stream.binder.rocketmq.properties;
import java.util.Arrays;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
/** /**
* @author Timur Valiev * binding rocketMq properties.
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties { public class RocketMQBinderConfigurationProperties extends RocketMQCommonProperties {
/**
* The name server list for rocketMQ.
*/
private List<String> nameServer = Arrays
.asList(RocketMQBinderConstants.DEFAULT_NAME_SERVER);
/**
* The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = true;
/**
* The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
public List<String> getNameServer() {
return nameServer;
}
public void setNameServer(List<String> nameServer) {
this.nameServer = nameServer;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
} }

@ -1,49 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBindingProperties implements BinderSpecificPropertiesProvider {
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
@Override
public RocketMQConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(RocketMQConsumerProperties consumer) {
this.consumer = consumer;
}
@Override
public RocketMQProducerProperties getProducer() {
return producer;
}
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
}

@ -1,11 +1,11 @@
/* /*
* Copyright 2013-2018 the original author or authors. * Copyright (C) 2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* https://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -16,109 +16,315 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties; package com.alibaba.cloud.stream.binder.rocketmq.properties;
import java.util.Set; import java.io.Serializable;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/** /**
* @author Timur Valiev * Extended consumer properties for RocketMQ binder.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQConsumerProperties { public class RocketMQConsumerProperties extends RocketMQCommonProperties {
/** /**
* using '||' to split tag {@link MQPushConsumer#subscribe(String, String)}. * Message model defines the way how messages are delivered to each consumer clients.
* </p>
*
* This field defaults to clustering.
*/ */
private String tags; private String messageModel = MessageModel.CLUSTERING.getModeCN();
/** /**
* {@link MQPushConsumer#subscribe(String, MessageSelector)} * Queue allocation algorithm specifying how message queues are allocated to each
* {@link MessageSelector#bySql(String)}. * consumer clients.
*/ */
private String sql; private String allocateMessageQueueStrategy;
/** /**
* {@link MessageModel#BROADCASTING}. * The expressions include tags or SQL,as follow:
* <p/>
* tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
* <p/>
* Determines whether there are specific characters "{@code ||}" in the expression to
* determine how the message is filtered,tags or SQL.
*/ */
private Boolean broadcasting = false; private String subscription;
/** /**
* if orderly is true, using {@link MessageListenerOrderly} else if orderly if false, * Delay some time when exception occur
* using {@link MessageListenerConcurrently}.
*/ */
private Boolean orderly = false; private long pullTimeDelayMillsWhenException = 1000;
/** /**
* for concurrently listener. message consume retry strategy. see * Consuming point on consumer booting.
* {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or * </p>
* discard, see {@link this#shouldRequeue}), others means requeue. *
* There are three consuming points:
* <ul>
* <li><code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it
* stopped previously. If it were a newly booting up consumer client, according aging
* of the consumer group, there are two cases:
* <ol>
* <li>if the consumer group is created so recently that the earliest message being
* subscribed has yet expired, which means the consumer group represents a lately
* launched business, consuming will start from the very beginning;</li>
* <li>if the earliest message being subscribed has expired, consuming will start from
* the latest messages, meaning messages born prior to the booting timestamp would be
* ignored.</li>
* </ol>
* </li>
* <li><code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from
* earliest messages available.</li>
* <li><code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified
* timestamp, which means messages born prior to {@link #consumeTimestamp} will be
* ignored</li>
* </ul>
*/ */
private int delayLevelWhenNextConsume = 0; private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* Backtracking consumption time with second precision. Time format is
* 20131223171201<br>
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll
.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/** /**
* for orderly listener. next retry delay time. * Flow control threshold on queue level, each message queue will cache at most 1000
* messages by default, Consider the {@link #pullBatchSize}, the instantaneous value
* may exceed the limit
*/ */
private long suspendCurrentQueueTimeMillis = 1000; private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most
* 100 MiB messages by default, Consider the {@link #pullBatchSize}, the instantaneous
* value may exceed the limit
*
* <p>
* The size of a message only measured by message body, so it's not accurate
*/
private int pullThresholdSizeForQueue = 100;
private Boolean enabled = true; /**
* Maximum number of messages pulled each time.
*/
private int pullBatchSize = 10;
/** /**
* {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}. * Consume max span offset.it has no effect on sequential consumption.
*/ */
private Set<String> trustedPackages; private int consumeMaxSpan = 2000;
private Push push = new Push();
private Pull pull = new Pull();
public String getMessageModel() {
return messageModel;
}
public RocketMQConsumerProperties setMessageModel(String messageModel) {
this.messageModel = messageModel;
return this;
}
public String getAllocateMessageQueueStrategy() {
return allocateMessageQueueStrategy;
}
public void setAllocateMessageQueueStrategy(String allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
public String getSubscription() {
return subscription;
}
public void setSubscription(String subscription) {
this.subscription = subscription;
}
public Push getPush() {
return push;
}
public void setPush(Push push) {
this.push = push;
}
public long getPullTimeDelayMillsWhenException() {
return pullTimeDelayMillsWhenException;
}
public RocketMQConsumerProperties setPullTimeDelayMillsWhenException(
long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
return this;
}
public ConsumeFromWhere getConsumeFromWhere() {
return consumeFromWhere;
}
// ------------ For Pull Consumer ------------ public RocketMQConsumerProperties setConsumeFromWhere(
ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
return this;
}
private long pullTimeout = 10 * 1000; public String getConsumeTimestamp() {
return consumeTimestamp;
}
private boolean fromStore; public RocketMQConsumerProperties setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
return this;
}
// ------------ For Pull Consumer ------------ public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
public RocketMQConsumerProperties setPullThresholdForQueue(
int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
return this;
}
public int getPullThresholdSizeForQueue() {
return pullThresholdSizeForQueue;
}
public RocketMQConsumerProperties setPullThresholdSizeForQueue(
int pullThresholdSizeForQueue) {
this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
return this;
}
public String getTags() { public int getPullBatchSize() {
return tags; return pullBatchSize;
} }
public void setTags(String tags) { public RocketMQConsumerProperties setPullBatchSize(int pullBatchSize) {
this.tags = tags; this.pullBatchSize = pullBatchSize;
return this;
} }
public String getSql() { public Pull getPull() {
return sql; return pull;
} }
public void setSql(String sql) { public RocketMQConsumerProperties setPull(Pull pull) {
this.sql = sql; this.pull = pull;
return this;
} }
public Boolean getOrderly() { public int getConsumeMaxSpan() {
return consumeMaxSpan;
}
public RocketMQConsumerProperties setConsumeMaxSpan(int consumeMaxSpan) {
this.consumeMaxSpan = consumeMaxSpan;
return this;
}
public static class Push implements Serializable {
private static final long serialVersionUID = -7398468554978817630L;
/**
* if orderly is true, using {@link MessageListenerOrderly} else if orderly if
* false, using {@link MessageListenerConcurrently}.
*/
private boolean orderly = false;
/**
* Suspending pulling time for cases requiring slow pulling like flow-control
* scenario. see{@link ConsumeMessageOrderlyService#processConsumeResult}.
* see{@link ConsumeOrderlyContext#getSuspendCurrentQueueTimeMillis}.
*/
private int suspendCurrentQueueTimeMillis = 1000;
/**
* https://github.com/alibaba/spring-cloud-alibaba/issues/1866 Max re-consume
* times. -1 means 16 times.
* </p>
* If messages are re-consumed more than {@link #maxReconsumeTimes} before
* success, it's be directed to a deletion queue waiting.
*/
private int maxReconsumeTimes;
/**
* for concurrently listener. message consume retry strategy. -1 means dlq(or
* discard. see {@link ConsumeMessageConcurrentlyService#processConsumeResult}.
* see {@link ConsumeConcurrentlyContext#getDelayLevelWhenNextConsume}.
*/
private int delayLevelWhenNextConsume = 0;
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
* <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated
* based on {@code pullThresholdForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message
* queues are assigned to this consumer, then pullThresholdForQueue will be set to
* 100.
*/
private int pullThresholdForTopic = -1;
/**
* Limit the cached message size on topic level, default value is -1
* MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated
* based on {@code pullThresholdSizeForTopic} if it is't unlimited
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10
* message queues are assigned to this consumer, then pullThresholdSizeForQueue
* will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
/**
* Message pull Interval
*/
private long pullInterval = 0;
/**
* Batch consumption size
*/
private int consumeMessageBatchMaxSize = 1;
public boolean getOrderly() {
return orderly; return orderly;
} }
public void setOrderly(Boolean orderly) { public void setOrderly(boolean orderly) {
this.orderly = orderly; this.orderly = orderly;
} }
public Boolean getEnabled() { public int getSuspendCurrentQueueTimeMillis() {
return enabled; return suspendCurrentQueueTimeMillis;
} }
public void setEnabled(Boolean enabled) { public void setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis) {
this.enabled = enabled; this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
} }
public Boolean getBroadcasting() { public int getMaxReconsumeTimes() {
return broadcasting; return maxReconsumeTimes;
} }
public void setBroadcasting(Boolean broadcasting) { public void setMaxReconsumeTimes(int maxReconsumeTimes) {
this.broadcasting = broadcasting; this.maxReconsumeTimes = maxReconsumeTimes;
} }
public int getDelayLevelWhenNextConsume() { public int getDelayLevelWhenNextConsume() {
@ -129,40 +335,117 @@ public class RocketMQConsumerProperties {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
} }
public long getSuspendCurrentQueueTimeMillis() { public int getPullThresholdForTopic() {
return suspendCurrentQueueTimeMillis; return pullThresholdForTopic;
} }
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { public void setPullThresholdForTopic(int pullThresholdForTopic) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; this.pullThresholdForTopic = pullThresholdForTopic;
}
public int getPullThresholdSizeForTopic() {
return pullThresholdSizeForTopic;
}
public void setPullThresholdSizeForTopic(int pullThresholdSizeForTopic) {
this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
}
public long getPullInterval() {
return pullInterval;
}
public void setPullInterval(long pullInterval) {
this.pullInterval = pullInterval;
}
public int getConsumeMessageBatchMaxSize() {
return consumeMessageBatchMaxSize;
} }
public long getPullTimeout() { public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
return pullTimeout; this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
} }
}
public static class Pull implements Serializable {
/**
* The poll timeout in milliseconds
*/
private long pollTimeoutMillis = 1000 * 5;
/**
* Pull thread number
*/
private int pullThreadNums = 20;
/**
* Interval time in in milliseconds for checking changes in topic metadata.
*/
private long topicMetadataCheckIntervalMillis = 30 * 1000;
/**
* Long polling mode, the Consumer connection timeout(must greater than
* brokerSuspendMaxTimeMillis), it is not recommended to modify
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
* Ack state handling, including receive, reject, and retry, when a consumption
* exception occurs. see {@link }
*/
private String errAcknowledge;
public void setPullTimeout(long pullTimeout) { private long pullThresholdForAll = 1000L;
this.pullTimeout = pullTimeout;
public long getPollTimeoutMillis() {
return pollTimeoutMillis;
}
public void setPollTimeoutMillis(long pollTimeoutMillis) {
this.pollTimeoutMillis = pollTimeoutMillis;
}
public int getPullThreadNums() {
return pullThreadNums;
}
public void setPullThreadNums(int pullThreadNums) {
this.pullThreadNums = pullThreadNums;
} }
public boolean isFromStore() { public long getTopicMetadataCheckIntervalMillis() {
return fromStore; return topicMetadataCheckIntervalMillis;
} }
public void setFromStore(boolean fromStore) { public void setTopicMetadataCheckIntervalMillis(
this.fromStore = fromStore; long topicMetadataCheckIntervalMillis) {
this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
} }
public boolean shouldRequeue() { public long getConsumerTimeoutMillisWhenSuspend() {
return delayLevelWhenNextConsume != -1; return consumerTimeoutMillisWhenSuspend;
} }
public Set<String> getTrustedPackages() { public void setConsumerTimeoutMillisWhenSuspend(
return trustedPackages; long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
} }
public void setTrustedPackages(Set<String> trustedPackages) { public String getErrAcknowledge() {
this.trustedPackages = trustedPackages; return errAcknowledge;
}
public void setErrAcknowledge(String errAcknowledge) {
this.errAcknowledge = errAcknowledge;
}
public long getPullThresholdForAll() {
return pullThresholdForAll;
}
public void setPullThresholdForAll(long pullThresholdForAll) {
this.pullThresholdForAll = pullThresholdForAll;
}
} }
} }

@ -1,11 +1,11 @@
/* /*
* Copyright 2013-2018 the original author or authors. * Copyright (C) 2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* https://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -21,12 +21,14 @@ import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
/** /**
* @author Timur Valiev * rocketMQ specific extended binding properties class that extends from
* {@link AbstractExtendedBindingProperties}.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@ConfigurationProperties("spring.cloud.stream.rocketmq") @ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties extends public class RocketMQExtendedBindingProperties extends
AbstractExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties, RocketMQBindingProperties> { AbstractExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties, RocketMQSpecificPropertiesProvider> {
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default"; private static final String DEFAULTS_PREFIX = "spring.cloud.stream.rocketmq.default";
@ -37,7 +39,7 @@ public class RocketMQExtendedBindingProperties extends
@Override @Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() { public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return RocketMQBindingProperties.class; return RocketMQSpecificPropertiesProvider.class;
} }
} }

@ -1,11 +1,11 @@
/* /*
* Copyright 2013-2018 the original author or authors. * Copyright (C) 2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* https://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -16,55 +16,39 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties; package com.alibaba.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
/** /**
* @author Timur Valiev * Extended producer properties for RocketMQ binder.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
public class RocketMQProducerProperties { public class RocketMQProducerProperties extends RocketMQCommonProperties {
private Boolean enabled = true;
/**
* Name of producer.
*/
private String group;
/**
* Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}.
*/
private Integer maxMessageSize = 1024 * 1024 * 4;
private Boolean transactional = false;
private Boolean sync = false;
private Boolean vipChannelEnabled = true;
/** /**
* Millis of send message timeout. * Timeout for sending messages.
*/ */
private int sendMessageTimeout = 3000; private int sendMsgTimeout = 3000;
/** /**
* Compress message body threshold, namely, message body larger than 4k will be * Compress message body threshold, namely, message body larger than 4k will be
* compressed on default. * compressed on default.
*/ */
private int compressMessageBodyThreshold = 1024 * 4; private int compressMsgBodyThreshold = 1024 * 4;
/** /**
* Maximum number of retry to perform internally before claiming sending failure in * Maximum number of retry to perform internally before claiming sending failure in
* synchronous mode. This may potentially cause message duplication which is up to * synchronous mode.
* application developers to resolve. * </p>
*
* This may potentially cause message duplication which is up to application
* developers to resolve.
*/ */
private int retryTimesWhenSendFailed = 2; private int retryTimesWhenSendFailed = 2;
/** /**
* <p>
* Maximum number of retry to perform internally before claiming sending failure in * Maximum number of retry to perform internally before claiming sending failure in
* asynchronous mode. * asynchronous mode.
* </p> * </p>
*
* This may potentially cause message duplication which is up to application * This may potentially cause message duplication which is up to application
* developers to resolve. * developers to resolve.
*/ */
@ -73,94 +57,165 @@ public class RocketMQProducerProperties {
/** /**
* Indicate whether to retry another broker on sending failure internally. * Indicate whether to retry another broker on sending failure internally.
*/ */
private boolean retryNextServer = false; private boolean retryAnotherBroker = false;
public String getGroup() { /**
return group; * Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4;
private String producerType = ProducerType.Normal.name();
private String sendType = SendType.Sync.name();
private String sendCallBack;
private String transactionListener;
private String messageQueueSelector;
private String errorMessageStrategy;
private String sendFailureChannel;
private String checkForbiddenHook;
private String sendMessageHook;
public int getSendMsgTimeout() {
return sendMsgTimeout;
}
public void setSendMsgTimeout(int sendMsgTimeout) {
this.sendMsgTimeout = sendMsgTimeout;
} }
public void setGroup(String group) { public int getCompressMsgBodyThreshold() {
this.group = group; return compressMsgBodyThreshold;
} }
public Boolean getEnabled() { public void setCompressMsgBodyThreshold(int compressMsgBodyThreshold) {
return enabled; this.compressMsgBodyThreshold = compressMsgBodyThreshold;
} }
public void setEnabled(Boolean enabled) { public int getRetryTimesWhenSendFailed() {
this.enabled = enabled; return retryTimesWhenSendFailed;
} }
public Integer getMaxMessageSize() { public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
public int getRetryTimesWhenSendAsyncFailed() {
return retryTimesWhenSendAsyncFailed;
}
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public boolean getRetryAnotherBroker() {
return retryAnotherBroker;
}
public void setRetryAnotherBroker(boolean retryAnotherBroker) {
this.retryAnotherBroker = retryAnotherBroker;
}
public int getMaxMessageSize() {
return maxMessageSize; return maxMessageSize;
} }
public void setMaxMessageSize(Integer maxMessageSize) { public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
} }
public Boolean getTransactional() { public String getProducerType() {
return transactional; return producerType;
} }
public void setTransactional(Boolean transactional) { public void setProducerType(String producerType) {
this.transactional = transactional; this.producerType = producerType;
} }
public Boolean getSync() { public String getSendType() {
return sync; return sendType;
} }
public void setSync(Boolean sync) { public void setSendType(String sendType) {
this.sync = sync; this.sendType = sendType;
} }
public Boolean getVipChannelEnabled() { public String getSendCallBack() {
return vipChannelEnabled; return sendCallBack;
} }
public void setVipChannelEnabled(Boolean vipChannelEnabled) { public void setSendCallBack(String sendCallBack) {
this.vipChannelEnabled = vipChannelEnabled; this.sendCallBack = sendCallBack;
} }
public int getSendMessageTimeout() { public String getTransactionListener() {
return sendMessageTimeout; return transactionListener;
} }
public void setSendMessageTimeout(int sendMessageTimeout) { public void setTransactionListener(String transactionListener) {
this.sendMessageTimeout = sendMessageTimeout; this.transactionListener = transactionListener;
} }
public int getCompressMessageBodyThreshold() { public String getMessageQueueSelector() {
return compressMessageBodyThreshold; return messageQueueSelector;
} }
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) { public void setMessageQueueSelector(String messageQueueSelector) {
this.compressMessageBodyThreshold = compressMessageBodyThreshold; this.messageQueueSelector = messageQueueSelector;
} }
public int getRetryTimesWhenSendFailed() { public String getErrorMessageStrategy() {
return retryTimesWhenSendFailed; return errorMessageStrategy;
} }
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { public void setErrorMessageStrategy(String errorMessageStrategy) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; this.errorMessageStrategy = errorMessageStrategy;
} }
public int getRetryTimesWhenSendAsyncFailed() { public String getSendFailureChannel() {
return retryTimesWhenSendAsyncFailed; return sendFailureChannel;
} }
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) { public void setSendFailureChannel(String sendFailureChannel) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; this.sendFailureChannel = sendFailureChannel;
}
public String getCheckForbiddenHook() {
return checkForbiddenHook;
}
public void setCheckForbiddenHook(String checkForbiddenHook) {
this.checkForbiddenHook = checkForbiddenHook;
} }
public boolean isRetryNextServer() { public String getSendMessageHook() {
return retryNextServer; return sendMessageHook;
} }
public void setRetryNextServer(boolean retryNextServer) { public void setSendMessageHook(String sendMessageHook) {
this.retryNextServer = retryNextServer; this.sendMessageHook = sendMessageHook;
}
public enum ProducerType {
Normal, Trans;
public boolean equalsName(String name) {
return this.name().equalsIgnoreCase(name);
}
}
public enum SendType {
OneWay, Async, Sync,;
public boolean equalsName(String name) {
return this.name().equalsIgnoreCase(name);
}
} }
} }

@ -36,7 +36,7 @@ public class PartitionMessageQueueSelector implements MessageQueueSelector {
@Override @Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer partition = 0; int partition = 0;
try { try {
partition = Math.abs( partition = Math.abs(
Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER))); Integer.parseInt(msg.getProperty(BinderHeaders.PARTITION_HEADER)));

Loading…
Cancel
Save