Use partition to implement rocketmq queue selection.

#667
pull/733/head
wangxing 6 years ago
parent bbf7bdee52
commit 443a43fe96

@ -16,9 +16,7 @@
package org.springframework.cloud.stream.binder.rocketmq;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@ -42,6 +40,7 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsu
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
@ -53,7 +52,8 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -149,6 +149,7 @@ public class RocketMQMessageChannelBinder extends
producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize());
rocketMQTemplate.setProducer(producer);
rocketMQTemplate.setMessageQueueSelector(new PartitionMessageQueueSelector());
}
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(

@ -16,8 +16,6 @@
package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.util.Optional;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
@ -27,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
@ -41,6 +40,8 @@ import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Optional;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ -145,36 +146,47 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
catch (Exception e) {
// ignore
}
boolean needSelectQueue = message.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER);
if (sync) {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
if (needSelectQueue) {
sendRes = rocketMQTemplate.syncSendOrderly(topicWithTags.toString(), message, "",
rocketMQTemplate.getProducer().getSendMsgTimeout());
} else {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
}
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
}
else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
}
@Override
public void onException(Throwable e) {
log.error(
"RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(
new MessagingException(
message, e),
null));
}
}
});
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
}
@Override
public void onException(Throwable e) {
log.error(
"RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(
new MessagingException(
message, e),
null));
}
}
};
if (needSelectQueue) {
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback,
rocketMQTemplate.getProducer().getSendMsgTimeout());
} else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback);
}
}
}
if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {

@ -0,0 +1,34 @@
package org.springframework.cloud.stream.binder.rocketmq.provisioning.selector;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import java.util.List;
/**
* @author wangxing
* @create 2019/7/3
*/
public class PartitionMessageQueueSelector implements MessageQueueSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionMessageQueueSelector.class);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer partition = 0;
try {
partition = Integer.valueOf(msg.getProperty(BinderHeaders.PARTITION_HEADER));
} catch (NumberFormatException ignored) {
}
if (partition >= mqs.size()) {
LOGGER.warn("the partition '{}' is greater than the number of queues '{}'.", partition, mqs.size());
partition = partition % mqs.size();
}
return mqs.get(partition);
}
}
Loading…
Cancel
Save