pull/611/head
fangjian0423 6 years ago
parent 04613951be
commit eabbc53b4b

@ -0,0 +1,61 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageQueueChooser {
private int queueIndex = 0;
private volatile List<MessageQueue> messageQueues;
public MessageQueue choose() {
return messageQueues.get(queueIndex);
}
public synchronized int requeue() {
if (queueIndex - 1 < 0) {
this.queueIndex = messageQueues.size() - 1;
}
else {
this.queueIndex = this.queueIndex - 1;
}
return this.queueIndex;
}
public void increment() {
this.queueIndex = (this.queueIndex + 1) % messageQueues.size();
}
public void reset(Set<MessageQueue> queueSet) {
this.messageQueues = null;
this.messageQueues = new ArrayList<>(queueSet);
this.queueIndex = 0;
}
public List<MessageQueue> getMessageQueues() {
return messageQueues;
}
}

@ -0,0 +1,387 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageSource extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageSource.class);
private final RocketMQCallbackFactory ackCallbackFactory;
private final RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final String topic;
private final String group;
private final Lock lock = new ReentrantLock();
private DefaultMQPullConsumer consumer;
private boolean running;
private MessageSelector messageSelector;
private RocketMQMessageQueueChooser messageQueueChooser = new RocketMQMessageQueueChooser();
public RocketMQMessageSource(
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
String topic, String group) {
this(new RocketMQCallbackFactory(), rocketMQBinderConfigurationProperties,
rocketMQConsumerProperties, topic, group);
}
public RocketMQMessageSource(RocketMQCallbackFactory ackCallbackFactory,
RocketMQBinderConfigurationProperties rocketMQBinderConfigurationProperties,
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
String topic, String group) {
this.ackCallbackFactory = ackCallbackFactory;
this.rocketMQBinderConfigurationProperties = rocketMQBinderConfigurationProperties;
this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.topic = topic;
this.group = group;
}
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException(
"pull consumer already running. " + this.toString());
}
try {
consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr(
rocketMQBinderConfigurationProperties.getNameServer());
consumer.setConsumerPullTimeoutMillis(
rocketMQConsumerProperties.getExtension().getPullTimeout());
consumer.setMessageModel(MessageModel.CLUSTERING);
String tags = rocketMQConsumerProperties.getExtension().getTags();
String sql = rocketMQConsumerProperties.getExtension().getSql();
if (!StringUtils.isEmpty(tags) && !StringUtils.isEmpty(sql)) {
messageSelector = MessageSelector.byTag(tags);
}
else if (!StringUtils.isEmpty(tags)) {
messageSelector = MessageSelector.byTag(tags);
}
else if (!StringUtils.isEmpty(sql)) {
messageSelector = MessageSelector.bySql(sql);
}
consumer.registerMessageQueueListener(topic, new MessageQueueListener() {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
log.info(
"messageQueueChanged, topic='{}', mqAll=`{}`, mqDivided=`{}`",
topic, mqAll, mqDivided);
switch (consumer.getMessageModel()) {
case BROADCASTING:
RocketMQMessageSource.this.resetMessageQueues(mqAll);
break;
case CLUSTERING:
RocketMQMessageSource.this.resetMessageQueues(mqDivided);
break;
default:
break;
}
}
});
consumer.start();
}
catch (MQClientException e) {
log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), e);
}
this.setRunning(true);
}
@Override
public void stop() {
if (this.isRunning()) {
this.setRunning(false);
consumer.shutdown();
}
}
@Override
public boolean isRunning() {
return running;
}
@Override
protected Object doReceive() {
if (messageQueueChooser.getMessageQueues() == null
|| messageQueueChooser.getMessageQueues().size() == 0) {
return null;
}
if (lock.tryLock()) {
try {
int count = 0;
while (count < messageQueueChooser.getMessageQueues().size()) {
MessageQueue messageQueue = messageQueueChooser.choose();
long offset = consumer.fetchConsumeOffset(messageQueue,
rocketMQConsumerProperties.getExtension().isFromStore());
log.debug(
"topic='{}', group='{}', messageQueue='{}', offset now='{}'",
this.topic, this.group, messageQueue, offset);
PullResult pullResult;
if (messageSelector != null) {
pullResult = consumer.pull(messageQueue, messageSelector, offset,
1);
}
else {
pullResult = consumer.pull(messageQueue, (String) null, offset,
1);
}
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
Message message = RocketMQUtil
.convertToSpringMessage(messageExtList.get(0));
AcknowledgmentCallback ackCallback = this.ackCallbackFactory
.createCallback(
new RocketMQAckInfo(messageQueue, pullResult,
consumer, messageQueueChooser, offset));
Message messageResult = MessageBuilder.fromMessage(message)
.setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
ackCallback)
.build();
messageQueueChooser.increment();
return messageResult;
}
else {
log.debug("messageQueue='{}' PullResult='{}' with topic `{}`",
messageQueueChooser.getMessageQueues(),
pullResult.getPullStatus(), topic);
}
messageQueueChooser.increment();
count++;
}
}
catch (Exception e) {
log.error("Consumer pull error: " + e.getMessage(), e);
}
finally {
lock.unlock();
}
}
return null;
}
@Override
public String getComponentType() {
return "rocketmq:message-source";
}
public void setRunning(boolean running) {
this.running = running;
}
public void resetMessageQueues(Set<MessageQueue> queueSet) {
lock.lock();
try {
log.info("resetMessageQueues, topic='{}', messageQueue=`{}`", topic,
queueSet);
this.messageQueueChooser.reset(queueSet);
}
finally {
lock.unlock();
}
}
public static class RocketMQCallbackFactory
implements AcknowledgmentCallbackFactory<RocketMQAckInfo> {
@Override
public AcknowledgmentCallback createCallback(RocketMQAckInfo info) {
return new RocketMQAckCallback(info);
}
}
public static class RocketMQAckCallback implements AcknowledgmentCallback {
private final RocketMQAckInfo ackInfo;
private boolean acknowledged;
private boolean autoAckEnabled = true;
public RocketMQAckCallback(RocketMQAckInfo ackInfo) {
this.ackInfo = ackInfo;
}
protected void setAcknowledged(boolean acknowledged) {
this.acknowledged = acknowledged;
}
@Override
public boolean isAcknowledged() {
return this.acknowledged;
}
@Override
public void noAutoAck() {
this.autoAckEnabled = false;
}
@Override
public boolean isAutoAck() {
return this.autoAckEnabled;
}
@Override
public void acknowledge(Status status) {
Assert.notNull(status, "'status' cannot be null");
log.debug("acknowledge(" + status.name() + ") for " + this);
try {
switch (status) {
case ACCEPT:
case REJECT:
ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(),
ackInfo.getPullResult().getNextBeginOffset());
log.debug("messageQueue='{}' offset update to `{}`",
ackInfo.getMessageQueue(),
String.valueOf(ackInfo.getPullResult().getNextBeginOffset()));
break;
case REQUEUE:
// decrease index and update offset of messageQueue of ackInfo
int oldIndex = ackInfo.getMessageQueueChooser().requeue();
ackInfo.getConsumer().updateConsumeOffset(ackInfo.getMessageQueue(),
ackInfo.getOldOffset());
log.debug(
"messageQueue='{}' offset requeue to index:`{}`, oldOffset:'{}'",
ackInfo.getMessageQueue(), oldIndex, ackInfo.getOldOffset());
break;
default:
break;
}
}
catch (MQClientException e) {
log.error("acknowledge error: " + e.getErrorMessage(), e);
}
finally {
this.acknowledged = true;
}
}
@Override
public String toString() {
return "RocketMQAckCallback{" + "ackInfo=" + ackInfo + ", acknowledged="
+ acknowledged + ", autoAckEnabled=" + autoAckEnabled + '}';
}
}
public static class RocketMQAckInfo {
private final MessageQueue messageQueue;
private final PullResult pullResult;
private final DefaultMQPullConsumer consumer;
private final RocketMQMessageQueueChooser messageQueueChooser;
private final long oldOffset;
public RocketMQAckInfo(MessageQueue messageQueue, PullResult pullResult,
DefaultMQPullConsumer consumer,
RocketMQMessageQueueChooser messageQueueChooser, long oldOffset) {
this.messageQueue = messageQueue;
this.pullResult = pullResult;
this.consumer = consumer;
this.messageQueueChooser = messageQueueChooser;
this.oldOffset = oldOffset;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public PullResult getPullResult() {
return pullResult;
}
public DefaultMQPullConsumer getConsumer() {
return consumer;
}
public RocketMQMessageQueueChooser getMessageQueueChooser() {
return messageQueueChooser;
}
public long getOldOffset() {
return oldOffset;
}
@Override
public String toString() {
return "RocketMQAckInfo{" + "messageQueue=" + messageQueue + ", pullResult="
+ pullResult + ", consumer=" + consumer + ", messageQueueChooser="
+ messageQueueChooser + ", oldOffset=" + oldOffset + '}';
}
}
}

@ -62,6 +62,14 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true;
// ------------ For Pull Consumer ------------
private long pullTimeout = 10 * 1000;
private boolean fromStore;
// ------------ For Pull Consumer ------------
public String getTags() {
return tags;
}
@ -117,4 +125,20 @@ public class RocketMQConsumerProperties {
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public long getPullTimeout() {
return pullTimeout;
}
public void setPullTimeout(long pullTimeout) {
this.pullTimeout = pullTimeout;
}
public boolean isFromStore() {
return fromStore;
}
public void setFromStore(boolean fromStore) {
this.fromStore = fromStore;
}
}

Loading…
Cancel
Save