From aa4b8790e293feafcad663d33d772f179afe8002 Mon Sep 17 00:00:00 2001
From: xiejiashuai <707094980@qq.com>
Date: Wed, 7 Aug 2019 15:36:15 +0800
Subject: [PATCH 1/4] add reconsume times header
---
.../rocketmq/RocketMQBinderConstants.java | 26 +-
.../RocketMQListenerBindingContainer.java | 755 +++++++++---------
2 files changed, 399 insertions(+), 382 deletions(-)
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
index 72c974203..097b03d13 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -16,21 +16,29 @@
package com.alibaba.cloud.stream.binder.rocketmq;
+import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
+
/**
* @author Jim
+ * @author Xiejiashuai
*/
public interface RocketMQBinderConstants {
- /**
- * Header key
- */
- String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
+ /**
+ * Header key
+ */
+ String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
+
+ /**
+ * Default value
+ */
+ String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
- /**
- * Default value
- */
- String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
+ String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
- String DEFAULT_GROUP = "rocketmq_binder_default_group_name";
+ /**
+ * RocketMQ re-consume times
+ */
+ String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
index 1a11324f8..05f06678d 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
@@ -16,19 +16,14 @@
package com.alibaba.cloud.stream.binder.rocketmq.consuming;
-import java.util.List;
-import java.util.Objects;
-
+import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
@@ -49,374 +44,388 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import java.util.List;
+import java.util.Objects;
+
+import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
/**
+ * A class that Listen on rocketmq message
+ *
+ * this class will delegate {@link RocketMQListener} to handle message
+ *
* @author Jim
+ * @author Xiejiashuai
+ * @see RocketMQListener
*/
public class RocketMQListenerBindingContainer
- implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
-
- private final static Logger log = LoggerFactory
- .getLogger(RocketMQListenerBindingContainer.class);
-
- private long suspendCurrentQueueTimeMillis = 1000;
-
- /**
- * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency.
- */
- private int delayLevelWhenNextConsume = 0;
-
- private String nameServer;
-
- private String consumerGroup;
-
- private String topic;
-
- private int consumeThreadMax = 64;
-
- private String charset = "UTF-8";
-
- private RocketMQListener rocketMQListener;
-
- private DefaultMQPushConsumer consumer;
-
- private boolean running;
-
- private final ExtendedConsumerProperties rocketMQConsumerProperties;
-
- private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- // The following properties came from RocketMQConsumerProperties.
- private ConsumeMode consumeMode;
- private SelectorType selectorType;
- private String selectorExpression;
- private MessageModel messageModel;
-
- public RocketMQListenerBindingContainer(
- ExtendedConsumerProperties rocketMQConsumerProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
- this.rocketMQConsumerProperties = rocketMQConsumerProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
- this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
- ? ConsumeMode.ORDERLY
- : ConsumeMode.CONCURRENTLY;
- if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
- this.selectorType = SelectorType.TAG;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
- }
- else {
- this.selectorType = SelectorType.SQL92;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
- }
- this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
- ? MessageModel.BROADCASTING
- : MessageModel.CLUSTERING;
- }
-
- @Override
- public void setupMessageListener(RocketMQListener> rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- @Override
- public void destroy() throws Exception {
- this.setRunning(false);
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- log.info("container destroyed, {}", this.toString());
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- initRocketMQPushConsumer();
- }
-
- @Override
- public boolean isAutoStartup() {
- return true;
- }
-
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
-
- @Override
- public void start() {
- if (this.isRunning()) {
- throw new IllegalStateException(
- "container already running. " + this.toString());
- }
-
- try {
- consumer.start();
- }
- catch (MQClientException e) {
- throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
- }
- this.setRunning(true);
-
- log.info("running container: {}", this.toString());
- }
-
- @Override
- public void stop() {
- if (this.isRunning()) {
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- setRunning(false);
- }
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- private void setRunning(boolean running) {
- this.running = running;
- }
-
- @Override
- public int getPhase() {
- return Integer.MAX_VALUE;
- }
-
- private void initRocketMQPushConsumer() throws MQClientException {
- Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
- Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
- Assert.notNull(nameServer, "Property 'nameServer' is required");
- Assert.notNull(topic, "Property 'topic' is required");
-
- String ak = rocketBinderConfigurationProperties.getAccessKey();
- String sk = rocketBinderConfigurationProperties.getSecretKey();
- if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
- RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
- consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
- new AllocateMessageQueueAveragely(),
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
- topic + "|" + UtilAll.getPid()));
- consumer.setVipChannelEnabled(false);
- }
- else {
- consumer = new DefaultMQPushConsumer(consumerGroup,
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- }
-
- consumer.setNamesrvAddr(nameServer);
- consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
- consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
-
- switch (messageModel) {
- case BROADCASTING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
- break;
- case CLUSTERING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
- break;
- default:
- throw new IllegalArgumentException("Property 'messageModel' was wrong.");
- }
-
- switch (selectorType) {
- case TAG:
- consumer.subscribe(topic, selectorExpression);
- break;
- case SQL92:
- consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
- break;
- default:
- throw new IllegalArgumentException("Property 'selectorType' was wrong.");
- }
-
- switch (consumeMode) {
- case ORDERLY:
- consumer.setMessageListener(new DefaultMessageListenerOrderly());
- break;
- case CONCURRENTLY:
- consumer.setMessageListener(new DefaultMessageListenerConcurrently());
- break;
- default:
- throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
- }
-
- if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
- .prepareStart(consumer);
- }
-
- }
-
- @Override
- public String toString() {
- return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
- + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
- + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
- + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
- + messageModel + '}';
- }
-
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
-
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
-
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
- }
-
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
- }
-
- public String getNameServer() {
- return nameServer;
- }
-
- public void setNameServer(String nameServer) {
- this.nameServer = nameServer;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getConsumeThreadMax() {
- return consumeThreadMax;
- }
-
- public void setConsumeThreadMax(int consumeThreadMax) {
- this.consumeThreadMax = consumeThreadMax;
- }
-
- public String getCharset() {
- return charset;
- }
-
- public void setCharset(String charset) {
- this.charset = charset;
- }
-
- public RocketMQListener getRocketMQListener() {
- return rocketMQListener;
- }
-
- public void setRocketMQListener(RocketMQListener rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(DefaultMQPushConsumer consumer) {
- this.consumer = consumer;
- }
-
- public ExtendedConsumerProperties getRocketMQConsumerProperties() {
- return rocketMQConsumerProperties;
- }
-
- public ConsumeMode getConsumeMode() {
- return consumeMode;
- }
-
- public SelectorType getSelectorType() {
- return selectorType;
- }
-
- public String getSelectorExpression() {
- return selectorExpression;
- }
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
- public class DefaultMessageListenerConcurrently
- implements MessageListenerConcurrently {
-
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener
- .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- }
- catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
-
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeOrderlyStatus consumeMessage(List msgs,
- ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener
- .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- }
- catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(
- suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
- }
+ implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
+
+ private final static Logger log = LoggerFactory
+ .getLogger(RocketMQListenerBindingContainer.class);
+
+ private long suspendCurrentQueueTimeMillis = 1000;
+
+ /**
+ * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency.
+ */
+ private int delayLevelWhenNextConsume = 0;
+
+ private String nameServer;
+
+ private String consumerGroup;
+
+ private String topic;
+
+ private int consumeThreadMax = 64;
+
+ private String charset = "UTF-8";
+
+ private RocketMQListener rocketMQListener;
+
+ private DefaultMQPushConsumer consumer;
+
+ private boolean running;
+
+ private final ExtendedConsumerProperties rocketMQConsumerProperties;
+
+ private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ // The following properties came from RocketMQConsumerProperties.
+ private ConsumeMode consumeMode;
+ private SelectorType selectorType;
+ private String selectorExpression;
+ private MessageModel messageModel;
+
+ public RocketMQListenerBindingContainer(
+ ExtendedConsumerProperties rocketMQConsumerProperties,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
+ this.rocketMQConsumerProperties = rocketMQConsumerProperties;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
+ this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
+ ? ConsumeMode.ORDERLY
+ : ConsumeMode.CONCURRENTLY;
+ if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
+ this.selectorType = SelectorType.TAG;
+ this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
+ } else {
+ this.selectorType = SelectorType.SQL92;
+ this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
+ }
+ this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
+ ? MessageModel.BROADCASTING
+ : MessageModel.CLUSTERING;
+ }
+
+ @Override
+ public void setupMessageListener(RocketMQListener> rocketMQListener) {
+ this.rocketMQListener = rocketMQListener;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ this.setRunning(false);
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
+ log.info("container destroyed, {}", this.toString());
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ initRocketMQPushConsumer();
+ }
+
+ @Override
+ public boolean isAutoStartup() {
+ return true;
+ }
+
+ @Override
+ public void stop(Runnable callback) {
+ stop();
+ callback.run();
+ }
+
+ @Override
+ public void start() {
+ if (this.isRunning()) {
+ throw new IllegalStateException(
+ "container already running. " + this.toString());
+ }
+
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
+ }
+ this.setRunning(true);
+
+ log.info("running container: {}", this.toString());
+ }
+
+ @Override
+ public void stop() {
+ if (this.isRunning()) {
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
+ setRunning(false);
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ private void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ @Override
+ public int getPhase() {
+ return Integer.MAX_VALUE;
+ }
+
+ private void initRocketMQPushConsumer() throws MQClientException {
+ Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
+ Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
+ Assert.notNull(nameServer, "Property 'nameServer' is required");
+ Assert.notNull(topic, "Property 'topic' is required");
+
+ String ak = rocketBinderConfigurationProperties.getAccessKey();
+ String sk = rocketBinderConfigurationProperties.getSecretKey();
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
+ consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
+ new AllocateMessageQueueAveragely(),
+ rocketBinderConfigurationProperties.isEnableMsgTrace(),
+ rocketBinderConfigurationProperties.getCustomizedTraceTopic());
+ consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
+ topic + "|" + UtilAll.getPid()));
+ consumer.setVipChannelEnabled(false);
+ } else {
+ consumer = new DefaultMQPushConsumer(consumerGroup,
+ rocketBinderConfigurationProperties.isEnableMsgTrace(),
+ rocketBinderConfigurationProperties.getCustomizedTraceTopic());
+ }
+
+ consumer.setNamesrvAddr(nameServer);
+ consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
+ consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
+
+ switch (messageModel) {
+ case BROADCASTING:
+ consumer.setMessageModel(
+ org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+ break;
+ case CLUSTERING:
+ consumer.setMessageModel(
+ org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'messageModel' was wrong.");
+ }
+
+ switch (selectorType) {
+ case TAG:
+ consumer.subscribe(topic, selectorExpression);
+ break;
+ case SQL92:
+ consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'selectorType' was wrong.");
+ }
+
+ switch (consumeMode) {
+ case ORDERLY:
+ consumer.setMessageListener(new DefaultMessageListenerOrderly());
+ break;
+ case CONCURRENTLY:
+ consumer.setMessageListener(new DefaultMessageListenerConcurrently());
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
+ }
+
+ if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
+ ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
+ .prepareStart(consumer);
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+ + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
+ + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
+ + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+ + messageModel + '}';
+ }
+
+ public long getSuspendCurrentQueueTimeMillis() {
+ return suspendCurrentQueueTimeMillis;
+ }
+
+ public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
+ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ }
+
+ public int getDelayLevelWhenNextConsume() {
+ return delayLevelWhenNextConsume;
+ }
+
+ public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+ this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ }
+
+ public String getNameServer() {
+ return nameServer;
+ }
+
+ public void setNameServer(String nameServer) {
+ this.nameServer = nameServer;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getConsumeThreadMax() {
+ return consumeThreadMax;
+ }
+
+ public void setConsumeThreadMax(int consumeThreadMax) {
+ this.consumeThreadMax = consumeThreadMax;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+
+ public void setCharset(String charset) {
+ this.charset = charset;
+ }
+
+ public RocketMQListener getRocketMQListener() {
+ return rocketMQListener;
+ }
+
+ public void setRocketMQListener(RocketMQListener rocketMQListener) {
+ this.rocketMQListener = rocketMQListener;
+ }
+
+ public DefaultMQPushConsumer getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(DefaultMQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public ExtendedConsumerProperties getRocketMQConsumerProperties() {
+ return rocketMQConsumerProperties;
+ }
+
+ public ConsumeMode getConsumeMode() {
+ return consumeMode;
+ }
+
+ public SelectorType getSelectorType() {
+ return selectorType;
+ }
+
+ public String getSelectorExpression() {
+ return selectorExpression;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public class DefaultMessageListenerConcurrently
+ implements MessageListenerConcurrently {
+
+ @SuppressWarnings({"unchecked", "Duplicates"})
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+ ConsumeConcurrentlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ preOnMessage(messageExt);
+ rocketMQListener
+ .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime);
+ } catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+ public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+ @SuppressWarnings({"unchecked", "Duplicates"})
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List msgs,
+ ConsumeOrderlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ preOnMessage(messageExt);
+ rocketMQListener
+ .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime);
+ } catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setSuspendCurrentQueueTimeMillis(
+ suspendCurrentQueueTimeMillis);
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ }
+
+ /**
+ * pre handle message before the consumer handle message
+ *
+ * @param messageExt the rocketmq message
+ */
+ private void preOnMessage(MessageExt messageExt) {
+ int reconsumeTimes = messageExt.getReconsumeTimes();
+ messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes));
+ }
}
From a1a215f6e70fd94a934ad0c9a8dd4f085631c4ba Mon Sep 17 00:00:00 2001
From: xiejiashuai <707094980@qq.com>
Date: Wed, 7 Aug 2019 16:25:49 +0800
Subject: [PATCH 2/4] format code use eclipse
---
.../rocketmq/RocketMQBinderConstants.java | 32 +-
.../RocketMQListenerBindingContainer.java | 761 +++++++++---------
2 files changed, 401 insertions(+), 392 deletions(-)
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
index 097b03d13..e2a527dde 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -24,21 +24,21 @@ import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
*/
public interface RocketMQBinderConstants {
- /**
- * Header key
- */
- String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
-
- /**
- * Default value
- */
- String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
-
- String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
-
- /**
- * RocketMQ re-consume times
- */
- String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
+ /**
+ * Header key
+ */
+ String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
+
+ /**
+ * Default value
+ */
+ String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
+
+ String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
+
+ /**
+ * RocketMQ re-consume times
+ */
+ String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
index 05f06678d..a6d2f3a0e 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
@@ -16,9 +16,11 @@
package com.alibaba.cloud.stream.binder.rocketmq.consuming;
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
+
+import java.util.List;
+import java.util.Objects;
+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -44,388 +46,395 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import java.util.List;
-import java.util.Objects;
-
-import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
+import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* A class that Listen on rocketmq message
*
- * this class will delegate {@link RocketMQListener} to handle message
+ * this class will delegate {@link RocketMQListener} to handle message
*
* @author Jim
* @author Xiejiashuai
* @see RocketMQListener
*/
public class RocketMQListenerBindingContainer
- implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
-
- private final static Logger log = LoggerFactory
- .getLogger(RocketMQListenerBindingContainer.class);
-
- private long suspendCurrentQueueTimeMillis = 1000;
-
- /**
- * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency.
- */
- private int delayLevelWhenNextConsume = 0;
-
- private String nameServer;
-
- private String consumerGroup;
-
- private String topic;
-
- private int consumeThreadMax = 64;
-
- private String charset = "UTF-8";
-
- private RocketMQListener rocketMQListener;
-
- private DefaultMQPushConsumer consumer;
-
- private boolean running;
-
- private final ExtendedConsumerProperties rocketMQConsumerProperties;
-
- private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- // The following properties came from RocketMQConsumerProperties.
- private ConsumeMode consumeMode;
- private SelectorType selectorType;
- private String selectorExpression;
- private MessageModel messageModel;
-
- public RocketMQListenerBindingContainer(
- ExtendedConsumerProperties rocketMQConsumerProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
- this.rocketMQConsumerProperties = rocketMQConsumerProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
- this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
- ? ConsumeMode.ORDERLY
- : ConsumeMode.CONCURRENTLY;
- if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
- this.selectorType = SelectorType.TAG;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
- } else {
- this.selectorType = SelectorType.SQL92;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
- }
- this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
- ? MessageModel.BROADCASTING
- : MessageModel.CLUSTERING;
- }
-
- @Override
- public void setupMessageListener(RocketMQListener> rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- @Override
- public void destroy() throws Exception {
- this.setRunning(false);
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- log.info("container destroyed, {}", this.toString());
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- initRocketMQPushConsumer();
- }
-
- @Override
- public boolean isAutoStartup() {
- return true;
- }
-
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
-
- @Override
- public void start() {
- if (this.isRunning()) {
- throw new IllegalStateException(
- "container already running. " + this.toString());
- }
-
- try {
- consumer.start();
- } catch (MQClientException e) {
- throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
- }
- this.setRunning(true);
-
- log.info("running container: {}", this.toString());
- }
-
- @Override
- public void stop() {
- if (this.isRunning()) {
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- setRunning(false);
- }
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- private void setRunning(boolean running) {
- this.running = running;
- }
-
- @Override
- public int getPhase() {
- return Integer.MAX_VALUE;
- }
-
- private void initRocketMQPushConsumer() throws MQClientException {
- Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
- Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
- Assert.notNull(nameServer, "Property 'nameServer' is required");
- Assert.notNull(topic, "Property 'topic' is required");
-
- String ak = rocketBinderConfigurationProperties.getAccessKey();
- String sk = rocketBinderConfigurationProperties.getSecretKey();
- if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
- RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
- consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
- new AllocateMessageQueueAveragely(),
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
- topic + "|" + UtilAll.getPid()));
- consumer.setVipChannelEnabled(false);
- } else {
- consumer = new DefaultMQPushConsumer(consumerGroup,
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- }
-
- consumer.setNamesrvAddr(nameServer);
- consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
- consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
-
- switch (messageModel) {
- case BROADCASTING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
- break;
- case CLUSTERING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
- break;
- default:
- throw new IllegalArgumentException("Property 'messageModel' was wrong.");
- }
-
- switch (selectorType) {
- case TAG:
- consumer.subscribe(topic, selectorExpression);
- break;
- case SQL92:
- consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
- break;
- default:
- throw new IllegalArgumentException("Property 'selectorType' was wrong.");
- }
-
- switch (consumeMode) {
- case ORDERLY:
- consumer.setMessageListener(new DefaultMessageListenerOrderly());
- break;
- case CONCURRENTLY:
- consumer.setMessageListener(new DefaultMessageListenerConcurrently());
- break;
- default:
- throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
- }
-
- if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
- .prepareStart(consumer);
- }
-
- }
-
- @Override
- public String toString() {
- return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
- + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
- + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
- + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
- + messageModel + '}';
- }
-
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
-
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
-
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
- }
-
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
- }
-
- public String getNameServer() {
- return nameServer;
- }
-
- public void setNameServer(String nameServer) {
- this.nameServer = nameServer;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getConsumeThreadMax() {
- return consumeThreadMax;
- }
-
- public void setConsumeThreadMax(int consumeThreadMax) {
- this.consumeThreadMax = consumeThreadMax;
- }
-
- public String getCharset() {
- return charset;
- }
-
- public void setCharset(String charset) {
- this.charset = charset;
- }
-
- public RocketMQListener getRocketMQListener() {
- return rocketMQListener;
- }
-
- public void setRocketMQListener(RocketMQListener rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(DefaultMQPushConsumer consumer) {
- this.consumer = consumer;
- }
-
- public ExtendedConsumerProperties getRocketMQConsumerProperties() {
- return rocketMQConsumerProperties;
- }
-
- public ConsumeMode getConsumeMode() {
- return consumeMode;
- }
-
- public SelectorType getSelectorType() {
- return selectorType;
- }
-
- public String getSelectorExpression() {
- return selectorExpression;
- }
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
- public class DefaultMessageListenerConcurrently
- implements MessageListenerConcurrently {
-
- @SuppressWarnings({"unchecked", "Duplicates"})
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- preOnMessage(messageExt);
- rocketMQListener
- .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
-
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
- @SuppressWarnings({"unchecked", "Duplicates"})
- @Override
- public ConsumeOrderlyStatus consumeMessage(List msgs,
- ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- preOnMessage(messageExt);
- rocketMQListener
- .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(
- suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
- }
-
- /**
- * pre handle message before the consumer handle message
- *
- * @param messageExt the rocketmq message
- */
- private void preOnMessage(MessageExt messageExt) {
- int reconsumeTimes = messageExt.getReconsumeTimes();
- messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes));
- }
+ implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
+
+ private final static Logger log = LoggerFactory
+ .getLogger(RocketMQListenerBindingContainer.class);
+
+ private long suspendCurrentQueueTimeMillis = 1000;
+
+ /**
+ * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency.
+ */
+ private int delayLevelWhenNextConsume = 0;
+
+ private String nameServer;
+
+ private String consumerGroup;
+
+ private String topic;
+
+ private int consumeThreadMax = 64;
+
+ private String charset = "UTF-8";
+
+ private RocketMQListener rocketMQListener;
+
+ private DefaultMQPushConsumer consumer;
+
+ private boolean running;
+
+ private final ExtendedConsumerProperties rocketMQConsumerProperties;
+
+ private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ // The following properties came from RocketMQConsumerProperties.
+ private ConsumeMode consumeMode;
+ private SelectorType selectorType;
+ private String selectorExpression;
+ private MessageModel messageModel;
+
+ public RocketMQListenerBindingContainer(
+ ExtendedConsumerProperties rocketMQConsumerProperties,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
+ this.rocketMQConsumerProperties = rocketMQConsumerProperties;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
+ this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
+ ? ConsumeMode.ORDERLY
+ : ConsumeMode.CONCURRENTLY;
+ if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
+ this.selectorType = SelectorType.TAG;
+ this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
+ }
+ else {
+ this.selectorType = SelectorType.SQL92;
+ this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
+ }
+ this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
+ ? MessageModel.BROADCASTING
+ : MessageModel.CLUSTERING;
+ }
+
+ @Override
+ public void setupMessageListener(RocketMQListener> rocketMQListener) {
+ this.rocketMQListener = rocketMQListener;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ this.setRunning(false);
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
+ log.info("container destroyed, {}", this.toString());
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ initRocketMQPushConsumer();
+ }
+
+ @Override
+ public boolean isAutoStartup() {
+ return true;
+ }
+
+ @Override
+ public void stop(Runnable callback) {
+ stop();
+ callback.run();
+ }
+
+ @Override
+ public void start() {
+ if (this.isRunning()) {
+ throw new IllegalStateException(
+ "container already running. " + this.toString());
+ }
+
+ try {
+ consumer.start();
+ }
+ catch (MQClientException e) {
+ throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
+ }
+ this.setRunning(true);
+
+ log.info("running container: {}", this.toString());
+ }
+
+ @Override
+ public void stop() {
+ if (this.isRunning()) {
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
+ setRunning(false);
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ private void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ @Override
+ public int getPhase() {
+ return Integer.MAX_VALUE;
+ }
+
+ private void initRocketMQPushConsumer() throws MQClientException {
+ Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
+ Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
+ Assert.notNull(nameServer, "Property 'nameServer' is required");
+ Assert.notNull(topic, "Property 'topic' is required");
+
+ String ak = rocketBinderConfigurationProperties.getAccessKey();
+ String sk = rocketBinderConfigurationProperties.getSecretKey();
+ if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
+ RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
+ consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
+ new AllocateMessageQueueAveragely(),
+ rocketBinderConfigurationProperties.isEnableMsgTrace(),
+ rocketBinderConfigurationProperties.getCustomizedTraceTopic());
+ consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
+ topic + "|" + UtilAll.getPid()));
+ consumer.setVipChannelEnabled(false);
+ }
+ else {
+ consumer = new DefaultMQPushConsumer(consumerGroup,
+ rocketBinderConfigurationProperties.isEnableMsgTrace(),
+ rocketBinderConfigurationProperties.getCustomizedTraceTopic());
+ }
+
+ consumer.setNamesrvAddr(nameServer);
+ consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
+ consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
+
+ switch (messageModel) {
+ case BROADCASTING:
+ consumer.setMessageModel(
+ org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
+ break;
+ case CLUSTERING:
+ consumer.setMessageModel(
+ org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'messageModel' was wrong.");
+ }
+
+ switch (selectorType) {
+ case TAG:
+ consumer.subscribe(topic, selectorExpression);
+ break;
+ case SQL92:
+ consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'selectorType' was wrong.");
+ }
+
+ switch (consumeMode) {
+ case ORDERLY:
+ consumer.setMessageListener(new DefaultMessageListenerOrderly());
+ break;
+ case CONCURRENTLY:
+ consumer.setMessageListener(new DefaultMessageListenerConcurrently());
+ break;
+ default:
+ throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
+ }
+
+ if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
+ ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
+ .prepareStart(consumer);
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
+ + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
+ + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
+ + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+ + messageModel + '}';
+ }
+
+ public long getSuspendCurrentQueueTimeMillis() {
+ return suspendCurrentQueueTimeMillis;
+ }
+
+ public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
+ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ }
+
+ public int getDelayLevelWhenNextConsume() {
+ return delayLevelWhenNextConsume;
+ }
+
+ public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+ this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ }
+
+ public String getNameServer() {
+ return nameServer;
+ }
+
+ public void setNameServer(String nameServer) {
+ this.nameServer = nameServer;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getConsumeThreadMax() {
+ return consumeThreadMax;
+ }
+
+ public void setConsumeThreadMax(int consumeThreadMax) {
+ this.consumeThreadMax = consumeThreadMax;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+
+ public void setCharset(String charset) {
+ this.charset = charset;
+ }
+
+ public RocketMQListener getRocketMQListener() {
+ return rocketMQListener;
+ }
+
+ public void setRocketMQListener(RocketMQListener rocketMQListener) {
+ this.rocketMQListener = rocketMQListener;
+ }
+
+ public DefaultMQPushConsumer getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(DefaultMQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public ExtendedConsumerProperties getRocketMQConsumerProperties() {
+ return rocketMQConsumerProperties;
+ }
+
+ public ConsumeMode getConsumeMode() {
+ return consumeMode;
+ }
+
+ public SelectorType getSelectorType() {
+ return selectorType;
+ }
+
+ public String getSelectorExpression() {
+ return selectorExpression;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public class DefaultMessageListenerConcurrently
+ implements MessageListenerConcurrently {
+
+ @SuppressWarnings({ "unchecked", "Duplicates" })
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+ ConsumeConcurrentlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ preOnMessage(messageExt);
+ rocketMQListener
+ .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("consume {} message key:[{}] cost: {} ms",
+ messageExt.getMsgId(), messageExt.getKeys(), costTime);
+ }
+ catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+ public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+ @SuppressWarnings({ "unchecked", "Duplicates" })
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List msgs,
+ ConsumeOrderlyContext context) {
+ for (MessageExt messageExt : msgs) {
+ log.debug("received msg: {}", messageExt);
+ try {
+ long now = System.currentTimeMillis();
+ preOnMessage(messageExt);
+ rocketMQListener
+ .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
+ long costTime = System.currentTimeMillis() - now;
+ log.debug("consume {} message key:[{}] cost: {} ms",
+ messageExt.getMsgId(), messageExt.getKeys(), costTime);
+ }
+ catch (Exception e) {
+ log.warn("consume message failed. messageExt:{}", messageExt, e);
+ context.setSuspendCurrentQueueTimeMillis(
+ suspendCurrentQueueTimeMillis);
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+
+ return ConsumeOrderlyStatus.SUCCESS;
+ }
+ }
+
+ /**
+ * pre handle message before the consumer handle message
+ *
+ * @param messageExt the rocketmq message
+ */
+ private void preOnMessage(MessageExt messageExt) {
+ int reconsumeTimes = messageExt.getReconsumeTimes();
+ messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
+ String.valueOf(reconsumeTimes));
+ }
}
From 5bbbad00543caf56389f190f64bd60729818bf2e Mon Sep 17 00:00:00 2001
From: xiejiashuai <707094980@qq.com>
Date: Mon, 12 Aug 2019 22:52:14 +0800
Subject: [PATCH 3/4] extract method to convert message
---
.../RocketMQListenerBindingContainer.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
index a6d2f3a0e..22834af2b 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.SmartLifecycle;
+import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -379,9 +380,7 @@ public class RocketMQListenerBindingContainer
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
- preOnMessage(messageExt);
- rocketMQListener
- .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
+ rocketMQListener.onMessage(convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} message key:[{}] cost: {} ms",
messageExt.getMsgId(), messageExt.getKeys(), costTime);
@@ -407,9 +406,7 @@ public class RocketMQListenerBindingContainer
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
- preOnMessage(messageExt);
- rocketMQListener
- .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
+ rocketMQListener.onMessage(convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} message key:[{}] cost: {} ms",
messageExt.getMsgId(), messageExt.getKeys(), costTime);
@@ -427,14 +424,19 @@ public class RocketMQListenerBindingContainer
}
/**
- * pre handle message before the consumer handle message
+ * Convert rocketmq {@link MessageExt} to Spring {@link Message}
*
* @param messageExt the rocketmq message
+ * @return the converted Spring {@link Message}
*/
- private void preOnMessage(MessageExt messageExt) {
+ private Message convertToSpringMessage(MessageExt messageExt) {
+
+ // add reconsume-times header to messageExt
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes));
+
+ return RocketMQUtil.convertToSpringMessage(messageExt);
}
}
From 2c35e13f5e7cbb5e4f13393285aa16ab095bfc23 Mon Sep 17 00:00:00 2001
From: xiejiashuai <707094980@qq.com>
Date: Tue, 13 Aug 2019 10:07:16 +0800
Subject: [PATCH 4/4] update orderly message log level to info
---
.../rocketmq/consuming/RocketMQListenerBindingContainer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
index 22834af2b..5bf540e07 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
@@ -408,7 +408,7 @@ public class RocketMQListenerBindingContainer
long now = System.currentTimeMillis();
rocketMQListener.onMessage(convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} message key:[{}] cost: {} ms",
+ log.info("consume {} message key:[{}] cost: {} ms",
messageExt.getMsgId(), messageExt.getKeys(), costTime);
}
catch (Exception e) {