Merge pull request #611 from fangjian0423/binder-dev

[RocketMQ Binder] Support PolledConsumer
pull/1014/head
format 6 years ago committed by GitHub
commit 38880a363c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,10 +1,15 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.messaging.SubscribableChannel;
/**
@ -27,10 +32,36 @@ public class RocketMQConsumerApplication {
@Input("input4")
SubscribableChannel input4();
@Input("input5")
PollableMessageSource input5();
}
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class, args);
}
@Bean
public ConsumerCustomRunner customRunner() {
return new ConsumerCustomRunner();
}
public static class ConsumerCustomRunner implements CommandLineRunner {
@Autowired
private MySink mySink;
@Override
public void run(String... args) throws InterruptedException {
while (true) {
mySink.input5().poll(m -> {
String payload = (String) m.getPayload();
System.out.println("pull msg: " + payload);
}, new ParameterizedTypeReference<String>() {
});
Thread.sleep(2_000);
}
}
}
}

@ -24,6 +24,10 @@ spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group
spring.application.name=rocketmq-consume-example
server.port=28082

@ -9,6 +9,7 @@ import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -23,6 +24,9 @@ public class RocketMQProduceApplication {
@Output("output2")
MessageChannel output2();
@Output("output3")
MessageChannel output3();
}
public static void main(String[] args) {
@ -31,7 +35,12 @@ public class RocketMQProduceApplication {
@Bean
public CustomRunner customRunner() {
return new CustomRunner();
return new CustomRunner("output1");
}
@Bean
public CustomRunner customRunner2() {
return new CustomRunner("output3");
}
@Bean
@ -40,24 +49,45 @@ public class RocketMQProduceApplication {
}
public static class CustomRunner implements CommandLineRunner {
private final String bindingName;
public CustomRunner(String bindingName) {
this.bindingName = bindingName;
}
@Autowired
private SenderService senderService;
@Autowired
private MySource mySource;
@Override
public void run(String... args) throws Exception {
int count = 5;
for (int index = 1; index <= count; index++) {
String msgContent = "msg-" + index;
if (index % 3 == 0) {
senderService.send(msgContent);
if (this.bindingName.equals("output1")) {
int count = 5;
for (int index = 1; index <= count; index++) {
String msgContent = "msg-" + index;
if (index % 3 == 0) {
senderService.send(msgContent);
}
else if (index % 3 == 1) {
senderService.sendWithTags(msgContent, "tagStr");
}
else {
senderService.sendObject(new Foo(index, "foo"), "tagObj");
}
}
else if (index % 3 == 1) {
senderService.sendWithTags(msgContent, "tagStr");
}
else {
senderService.sendObject(new Foo(index, "foo"), "tagObj");
}
else if (this.bindingName.equals("output3")) {
int count = 50;
for (int index = 1; index <= count; index++) {
String msgContent = "pullMsg-" + index;
mySource.output3()
.send(MessageBuilder.withPayload(msgContent).build());
}
}
}
}

@ -12,6 +12,10 @@ spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group
spring.application.name=rocketmq-produce-example
server.port=28081

@ -34,6 +34,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
@ -42,9 +43,13 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProdu
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -83,7 +88,7 @@ public class RocketMQMessageChannelBinder extends
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
// if producerGroup is empty, using destination
// if producerGroup is empty, using destination
String extendedProducerGroup = producerProperties.getExtension().getGroup();
String producerGroup = StringUtils.isEmpty(extendedProducerGroup)
? destination.getName()
@ -206,6 +211,39 @@ public class RocketMQMessageChannelBinder extends
return rocketInboundChannelAdapter;
}
@Override
protected PolledConsumerResources createPolledConsumerResources(String name,
String group, ConsumerDestination destination,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
RocketMQMessageSource rocketMQMessageSource = new RocketMQMessageSource(
rocketBinderConfigurationProperties, consumerProperties, name, group);
return new PolledConsumerResources(rocketMQMessageSource,
registerErrorInfrastructure(destination, group, consumerProperties,
true));
}
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> properties) {
return message -> {
if (message.getPayload() instanceof MessagingException) {
AcknowledgmentCallback ack = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
if (properties.getExtension().shouldRequeue()) {
ack.acknowledge(Status.REQUEUE);
}
else {
ack.acknowledge(Status.REJECT);
}
}
}
};
}
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);

@ -0,0 +1,61 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageQueueChooser {
private volatile int queueIndex = 0;
private volatile List<MessageQueue> messageQueues;
public MessageQueue choose() {
return messageQueues.get(queueIndex);
}
public int requeue() {
if (queueIndex - 1 < 0) {
this.queueIndex = messageQueues.size() - 1;
}
else {
this.queueIndex = this.queueIndex - 1;
}
return this.queueIndex;
}
public void increment() {
this.queueIndex = (this.queueIndex + 1) % messageQueues.size();
}
public void reset(Set<MessageQueue> queueSet) {
this.messageQueues = null;
this.messageQueues = new ArrayList<>(queueSet);
this.queueIndex = 0;
}
public List<MessageQueue> getMessageQueues() {
return messageQueues;
}
}

@ -0,0 +1,378 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageSource extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageSource.class);
private final RocketMQCallbackFactory ackCallbackFactory;
private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final String topic;
private final String group;
private final Object consumerMonitor = new Object();
private DefaultMQPullConsumer consumer;
private boolean running;
private MessageSelector messageSelector;
private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser();
public RocketMQMessageSource(
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
String topic, String group) {
this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties,
rocketMQConsumerProperties, topic, group);
}
public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory,
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
String topic, String group) {
this.ackCallbackFactory = ackCallbackFactory;
this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.topic = topic;
this.group = group;
}
@Override
public synchronized void start() {
if (this.isRunning()) {
throw new IllegalStateException(
"pull consumer already running. " + this.toString());
}
try {
consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr(
rocketMQBinderConfigurationProperties.getNameServer());
consumer.setConsumerPullTimeoutMillis(
rocketMQConsumerProperties.getExtension().getPullTimeout());
consumer.setMessageModel(MessageModel.CLUSTERING);
String tags = rocketMQConsumerProperties.getExtension().getTags();
String sql = rocketMQConsumerProperties.getExtension().getSql();
if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) {
messageSelector = MessageSelector.byTag(tags);
}
else if (!StringUtils.isEmpty(tags)) {
messageSelector = MessageSelector.byTag(tags);
}
else if (!StringUtils.isEmpty(sql)) {
messageSelector = MessageSelector.bySql(sql);
}
consumer.registerMessageQueueListener(topic, new MessageQueueListener() {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
log.info(
"messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`",
topic, mqAll, mqDivided);
switch (consumer.getMessageModel()) {
case BROADCASTING:
RocketMQMessageSource.this.resetMessageQueues(mqAll);
break;
case CLUSTERING:
RocketMQMessageSource.this.resetMessageQueues(mqDivided);
break;
default:
break;
}
}
});
consumer.start();
}
catch (MQClientException e) {
log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e);
}
this.setRunning(true);
}
@Override
public synchronized void stop() {
if (this.isRunning()) {
this.setRunning(false);
consumer.shutdown();
}
}
@Override
public synchronized boolean isRunning() {
return running;
}
@Override
protected synchronized Object doReceive() {
if (messageQueueChooser.getMessageQueues() == null
|| messageQueueChooser.getMessageQueues().size() == 0) {
return null;
}
try {
int count = 0;
while (count < messageQueueChooser.getMessageQueues().size()) {
MessageQueue messageQueue;
synchronized (this.consumerMonitor) {
messageQueue = messageQueueChooser.choose();
messageQueueChooser.increment();
}
long offset = consumer.fetchConsumeOffset(messageQueue,
rocketMQConsumerProperties.getExtension().isFromStore());
log.debug("topic='{}', group='{}', messageQueue='{}', offset now='{}'",
this.topic, this.group, messageQueue, offset);
PullResult pullResult;
if (messageSelector != null) {
pullResult = consumer.pull(messageQueue, messageSelector, offset, 1);
}
else {
pullResult = consumer.pull(messageQueue, (String) null, offset, 1);
}
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
Message message = RocketMQUtil
.convertToSpringMessage(messageExtList.get(0));
AcknowledgmentCallback ackCallback = this.ackCallbackFactory
.createCallback(new RocketMQAckInfo(messageQueue, pullResult,
consumer, offset));
Message messageResult = MessageBuilder.fromMessage(message).setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
ackCallback).build();
return messageResult;
}
else {
log.debug("messageQueue='{}' PullResult='{}' with topic `{}`",
messageQueueChooser.getMessageQueues(),
pullResult.getPullStatus(), topic);
}
count++;
}
}
catch (Exception e) {
log.error("Consumer pull error: " + e.getMessage(), e);
}
return null;
}
@Override
public String getComponentType() {
return "rocketmq:message-source";
}
public synchronized void setRunning(boolean running) {
this.running = running;
}
public synchronized void resetMessageQueues(Set<MessageQueue> queueSet) {
log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, queueSet);
synchronized (this.consumerMonitor) {
this.messageQueueChooser.reset(queueSet);
}
}
public static class RocketMQCallbackFactory
implements AcknowledgmentCallbackFactory<RocketMQAckInfo> {
@Override
public AcknowledgmentCallback createCallback(RocketMQAckInfo info) {
return new RocketMQAckCallback(info);
}
}
public static class RocketMQAckCallback implements AcknowledgmentCallback {
private final RocketMQAckInfo ackInfo;
private boolean acknowledged;
private boolean autoAckEnabled = true;
public RocketMQAckCallback(RocketMQAckInfo ackInfo) {
this.ackInfo = ackInfo;
}
protected void setAcknowledged(boolean acknowledged) {
this.acknowledged = acknowledged;
}
@Override
public boolean isAcknowledged() {
return this.acknowledged;
}
@Override
public void noAutoAck() {
this.autoAckEnabled = false;
}
@Override
public boolean isAutoAck() {
return this.autoAckEnabled;
}
@Override
public void acknowledge(Status status) {
Assert.notNull(status, "'status' cannot be null");
if (this.acknowledged) {
throw new IllegalStateException("Already acknowledged");
}
log.debug("acknowledge(" + status.name() + ") for " + this);
synchronized (this.ackInfo.getConsumerMonitor()) {
try {
switch (status) {
case ACCEPT:
case REJECT:
ackInfo.getConsumer().updateConsumeOffset(
ackInfo.getMessageQueue(),
ackInfo.getPullResult().getNextBeginOffset());
log.debug("messageQueue='{}' offset update to `{}`",
ackInfo.getMessageQueue(), String.valueOf(
ackInfo.getPullResult().getNextBeginOffset()));
break;
case REQUEUE:
// decrease index and update offset of messageQueue of ackInfo
int oldIndex = ackInfo.getMessageQueueChooser().requeue();
ackInfo.getConsumer().updateConsumeOffset(
ackInfo.getMessageQueue(), ackInfo.getOldOffset());
log.debug(
"messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'",
ackInfo.getMessageQueue(), oldIndex,
ackInfo.getOldOffset());
break;
default:
break;
}
}
catch (MQClientException e) {
log.error("acknowledge error: " + e.getErrorMessage(), e);
}
finally {
this.acknowledged = true;
}
}
}
@Override
public String toString() {
return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged="
+ acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}';
}
}
public class RocketMQAckInfo {
private final MessageQueue messageQueue;
private final PullResult pullResult;
private final DefaultMQPullConsumer consumer;
private final long oldOffset;
public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult,
DefaultMQPullConsumer consumer, long oldOffset) {
this.messageQueue = messageQueue;
this.pullResult = pullResult;
this.consumer = consumer;
this.oldOffset = oldOffset;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public PullResult getPullResult() {
return pullResult;
}
public DefaultMQPullConsumer getConsumer() {
return consumer;
}
public RocketMQMessageQueueChooser getMessageQueueChooser() {
return RocketMQMessageSource.this.messageQueueChooser;
}
public long getOldOffset() {
return oldOffset;
}
public Object getConsumerMonitor() {
return RocketMQMessageSource.this.consumerMonitor;
}
@Override
public String toString() {
return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult="
+ pullResult + ", consumer=" + consumer + ", oldOffset=" + oldOffset
+ '}';
}
}
}

@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@ -51,7 +52,9 @@ public class RocketMQConsumerProperties {
private Boolean orderly = false;
/**
* for concurrently listener. message consume retry strategy
* for concurrently listener. message consume retry strategy. see
* {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or
* discard, see {@link this#shouldRequeue}), others means requeue
*/
private int delayLevelWhenNextConsume = 0;
@ -62,6 +65,14 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true;
// ------------ For Pull Consumer ------------
private long pullTimeout = 10 * 1000;
private boolean fromStore;
// ------------ For Pull Consumer ------------
public String getTags() {
return tags;
}
@ -117,4 +128,24 @@ public class RocketMQConsumerProperties {
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public long getPullTimeout() {
return pullTimeout;
}
public void setPullTimeout(long pullTimeout) {
this.pullTimeout = pullTimeout;
}
public boolean isFromStore() {
return fromStore;
}
public void setFromStore(boolean fromStore) {
this.fromStore = fromStore;
}
public boolean shouldRequeue() {
return delayLevelWhenNextConsume != -1;
}
}

Loading…
Cancel
Save