diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java new file mode 100644 index 000000000..35663890f --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * @author Jim + */ +public class RocketMQMessageQueueChooser { + + private int queueIndex = 0; + + private volatile List messageQueues; + + public MessageQueue choose() { + return messageQueues.get(queueIndex); + } + + public synchronized int requeue() { + if (queueIndex - 1 < 0) { + this.queueIndex = messageQueues.size() - 1; + } + else { + this.queueIndex = this.queueIndex - 1; + } + return this.queueIndex; + } + + public void increment() { + this.queueIndex = (this.queueIndex + 1) % messageQueues.size(); + } + + public void reset(Set queueSet) { + this.messageQueues = null; + this.messageQueues = new ArrayList<>(queueSet); + this.queueIndex = 0; + } + + public List getMessageQueues() { + return messageQueues; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java new file mode 100644 index 000000000..316a40bf9 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java @@ -0,0 +1,387 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.context.Lifecycle; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.AcknowledgmentCallback; +import org.springframework.integration.acks.AcknowledgmentCallbackFactory; +import org.springframework.integration.endpoint.AbstractMessageSource; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQMessageSource extends AbstractMessageSource + implements DisposableBean, Lifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQMessageSource.class); + + private final RocketMQCallbackFactory ackCallbackFactory; + + private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties; + + private final ExtendedConsumerProperties rocketMQConsumerProperties; + + private final String topic; + + private final String group; + + private final Lock lock = new ReentrantLock(); + + private DefaultMQPullConsumer consumer; + + private boolean running; + + private MessageSelector messageSelector; + + private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser(); + + public RocketMQMessageSource( + RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, + ExtendedConsumerProperties rocketMQConsumerProperties, + String topic, String group) { + this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties, + rocketMQConsumerProperties, topic, group); + } + + public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory, + RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties, + ExtendedConsumerProperties rocketMQConsumerProperties, + String topic, String group) { + this.ackCallbackFactory = ackCallbackFactory; + this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties; + this.rocketMQConsumerProperties = rocketMQConsumerProperties; + this.topic = topic; + this.group = group; + } + + @Override + public void start() { + if (this.isRunning()) { + throw new IllegalStateException( + "pull consumer already running. " + this.toString()); + } + try { + consumer = new DefaultMQPullConsumer(group); + consumer.setNamesrvAddr( + rocketMQBinderConfigurationProperties.getNameServer()); + consumer.setConsumerPullTimeoutMillis( + rocketMQConsumerProperties.getExtension().getPullTimeout()); + consumer.setMessageModel(MessageModel.CLUSTERING); + + String tags = rocketMQConsumerProperties.getExtension().getTags(); + String sql = rocketMQConsumerProperties.getExtension().getSql(); + + if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) { + messageSelector = MessageSelector.byTag(tags); + } + else if (!StringUtils.isEmpty(tags)) { + messageSelector = MessageSelector.byTag(tags); + } + else if (!StringUtils.isEmpty(sql)) { + messageSelector = MessageSelector.bySql(sql); + } + + consumer.registerMessageQueueListener(topic, new MessageQueueListener() { + @Override + public void messageQueueChanged(String topic, Set mqAll, + Set mqDivided) { + log.info( + "messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`", + topic, mqAll, mqDivided); + switch (consumer.getMessageModel()) { + case BROADCASTING: + RocketMQMessageSource.this.resetMessageQueues(mqAll); + break; + case CLUSTERING: + RocketMQMessageSource.this.resetMessageQueues(mqDivided); + break; + default: + break; + } + } + }); + consumer.start(); + } + catch (MQClientException e) { + log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e); + } + this.setRunning(true); + } + + @Override + public void stop() { + if (this.isRunning()) { + this.setRunning(false); + consumer.shutdown(); + } + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + protected Object doReceive() { + if (messageQueueChooser.getMessageQueues() == null + || messageQueueChooser.getMessageQueues().size() == 0) { + return null; + } + if (lock.tryLock()) { + try { + int count = 0; + while (count < messageQueueChooser.getMessageQueues().size()) { + + MessageQueue messageQueue = messageQueueChooser.choose(); + + long offset = consumer.fetchConsumeOffset(messageQueue, + rocketMQConsumerProperties.getExtension().isFromStore()); + + log.debug( + "topic='{}', group='{}', messageQueue='{}', offset now='{}'", + this.topic, this.group, messageQueue, offset); + + PullResult pullResult; + if (messageSelector != null) { + pullResult = consumer.pull(messageQueue, messageSelector, offset, + 1); + } + else { + pullResult = consumer.pull(messageQueue, (String) null, offset, + 1); + } + + if (pullResult.getPullStatus() == PullStatus.FOUND) { + List messageExtList = pullResult.getMsgFoundList(); + Message message = RocketMQUtil + .convertToSpringMessage(messageExtList.get(0)); + + AcknowledgmentCallback ackCallback = this.ackCallbackFactory + .createCallback( + new RocketMQAckInfo(messageQueue, pullResult, + consumer, messageQueueChooser, offset)); + + Message messageResult = MessageBuilder.fromMessage(message) + .setHeader( + IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, + ackCallback) + .build(); + messageQueueChooser.increment(); + return messageResult; + } + else { + log.debug("messageQueue='{}' PullResult='{}' with topic `{}`", + messageQueueChooser.getMessageQueues(), + pullResult.getPullStatus(), topic); + } + messageQueueChooser.increment(); + count++; + } + } + catch (Exception e) { + log.error("Consumer pull error: " + e.getMessage(), e); + } + finally { + lock.unlock(); + } + } + return null; + } + + @Override + public String getComponentType() { + return "rocketmq:message-source"; + } + + public void setRunning(boolean running) { + this.running = running; + } + + public void resetMessageQueues(Set queueSet) { + lock.lock(); + try { + log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic, + queueSet); + this.messageQueueChooser.reset(queueSet); + } + finally { + lock.unlock(); + } + } + + public static class RocketMQCallbackFactory + implements AcknowledgmentCallbackFactory { + + @Override + public AcknowledgmentCallback createCallback(RocketMQAckInfo info) { + return new RocketMQAckCallback(info); + } + + } + + public static class RocketMQAckCallback implements AcknowledgmentCallback { + + private final RocketMQAckInfo ackInfo; + + private boolean acknowledged; + + private boolean autoAckEnabled = true; + + public RocketMQAckCallback(RocketMQAckInfo ackInfo) { + this.ackInfo = ackInfo; + } + + protected void setAcknowledged(boolean acknowledged) { + this.acknowledged = acknowledged; + } + + @Override + public boolean isAcknowledged() { + return this.acknowledged; + } + + @Override + public void noAutoAck() { + this.autoAckEnabled = false; + } + + @Override + public boolean isAutoAck() { + return this.autoAckEnabled; + } + + @Override + public void acknowledge(Status status) { + Assert.notNull(status, "'status' cannot be null"); + log.debug("acknowledge(" + status.name() + ") for " + this); + try { + switch (status) { + case ACCEPT: + case REJECT: + ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), + ackInfo.getPullResult().getNextBeginOffset()); + log.debug("messageQueue='{}' offset update to `{}`", + ackInfo.getMessageQueue(), + String.valueOf(ackInfo.getPullResult().getNextBeginOffset())); + break; + case REQUEUE: + // decrease index and update offset of messageQueue of ackInfo + int oldIndex = ackInfo.getMessageQueueChooser().requeue(); + ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(), + ackInfo.getOldOffset()); + log.debug( + "messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'", + ackInfo.getMessageQueue(), oldIndex, ackInfo.getOldOffset()); + break; + default: + break; + } + } + catch (MQClientException e) { + log.error("acknowledge error: " + e.getErrorMessage(), e); + } + finally { + this.acknowledged = true; + } + } + + @Override + public String toString() { + return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged=" + + acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}'; + } + } + + public static class RocketMQAckInfo { + + private final MessageQueue messageQueue; + + private final PullResult pullResult; + + private final DefaultMQPullConsumer consumer; + + private final RocketMQMessageQueueChooser messageQueueChooser; + + private final long oldOffset; + + public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult, + DefaultMQPullConsumer consumer, + RocketMQMessageQueueChooser messageQueueChooser, long oldOffset) { + this.messageQueue = messageQueue; + this.pullResult = pullResult; + this.consumer = consumer; + this.messageQueueChooser = messageQueueChooser; + this.oldOffset = oldOffset; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public PullResult getPullResult() { + return pullResult; + } + + public DefaultMQPullConsumer getConsumer() { + return consumer; + } + + public RocketMQMessageQueueChooser getMessageQueueChooser() { + return messageQueueChooser; + } + + public long getOldOffset() { + return oldOffset; + } + + @Override + public String toString() { + return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult=" + + pullResult + ", consumer=" + consumer + ", messageQueueChooser=" + + messageQueueChooser + ", oldOffset=" + oldOffset + '}'; + } + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 6cfe9b842..00b41a032 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -62,6 +62,14 @@ public class RocketMQConsumerProperties { private Boolean enabled = true; + // ------------ For Pull Consumer ------------ + + private long pullTimeout = 10 * 1000; + + private boolean fromStore; + + // ------------ For Pull Consumer ------------ + public String getTags() { return tags; } @@ -117,4 +125,20 @@ public class RocketMQConsumerProperties { public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } + + public long getPullTimeout() { + return pullTimeout; + } + + public void setPullTimeout(long pullTimeout) { + this.pullTimeout = pullTimeout; + } + + public boolean isFromStore() { + return fromStore; + } + + public void setFromStore(boolean fromStore) { + this.fromStore = fromStore; + } }