From 6d7c47a3660de3aa40fd519f52c84947638a4733 Mon Sep 17 00:00:00 2001
From: zkzlx <kiss_maple@163.com>
Date: Mon, 22 Mar 2021 11:05:31 +0800
Subject: [PATCH] Code refactoring and some new feature support - delete some
 invalid files.

---
 .../src/main/resources/application.properties |  8 ++--
 .../alibaba/cloud/examples/SenderService.java |  4 +-
 .../examples/TransactionListenerImpl.java     | 38 +++++++++----------
 .../src/main/resources/application.properties |  1 +
 .../rocketmq/contants/RocketMQConst.java      | 13 +------
 .../inbound/pull/RocketMQMessageSource.java   | 25 +++++++++---
 .../RocketMQProducerMessageHandler.java       |  2 +-
 .../binder/rocketmq/utils/RocketMQUtils.java  |  3 +-
 .../RocketMQAutoConfigurationTests.java       | 22 +++++------
 9 files changed, 61 insertions(+), 55 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties
index 2779db522..e2e77bd0b 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties
@@ -3,20 +3,20 @@ spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
 spring.cloud.stream.bindings.input1.destination=test-topic
 spring.cloud.stream.bindings.input1.content-type=text/plain
 spring.cloud.stream.bindings.input1.group=test-group1
-spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
+spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true
 
 spring.cloud.stream.bindings.input2.destination=test-topic
 spring.cloud.stream.bindings.input2.content-type=text/plain
 spring.cloud.stream.bindings.input2.group=test-group2
-spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
-spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
+spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false
+spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tagStr
 spring.cloud.stream.bindings.input2.consumer.concurrency=20
 spring.cloud.stream.bindings.input2.consumer.maxAttempts=1
 
 spring.cloud.stream.bindings.input3.destination=test-topic
 spring.cloud.stream.bindings.input3.content-type=application/json
 spring.cloud.stream.bindings.input3.group=test-group3
-spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
+spring.cloud.stream.rocketmq.bindings.input3.consumer.subscription=tagObj
 spring.cloud.stream.bindings.input3.consumer.concurrency=20
 
 spring.cloud.stream.bindings.input4.destination=TransactionTopic
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java
index cd9e50939..f7db15b57 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java
@@ -20,8 +20,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.alibaba.cloud.examples.RocketMQProduceApplication.MySource;
+import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
 import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.spring.support.RocketMQHeaders;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.messaging.Message;
@@ -62,7 +62,7 @@ public class SenderService {
 		MessageBuilder builder = MessageBuilder.withPayload(msg)
 				.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
 		builder.setHeader("test", String.valueOf(num));
-		builder.setHeader(RocketMQHeaders.TAGS, "binder");
+		builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
 		Message message = builder.build();
 		source.output2().send(message);
 	}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java
index e58beb221..1cc1b3290 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java
@@ -16,43 +16,43 @@
 
 package com.alibaba.cloud.examples;
 
-import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
-import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
-import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
 
-import org.springframework.messaging.Message;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.stereotype.Component;
 
 /**
  * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
  */
-@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5,
-		maximumPoolSize = 10)
-public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+@Component("myTransactionListener")
+public class TransactionListenerImpl implements TransactionListener {
 
 	@Override
-	public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
-			Object arg) {
-		Object num = msg.getHeaders().get("test");
+	public LocalTransactionState executeLocalTransaction(Message msg,
+														 Object arg) {
+		Object num = msg.getProperty("test");
 
 		if ("1".equals(num)) {
 			System.out.println(
-					"executer: " + new String((byte[]) msg.getPayload()) + " unknown");
-			return RocketMQLocalTransactionState.UNKNOWN;
+					"executer: " + new String(msg.getBody()) + " unknown");
+			return LocalTransactionState.UNKNOW;
 		}
 		else if ("2".equals(num)) {
 			System.out.println(
-					"executer: " + new String((byte[]) msg.getPayload()) + " rollback");
-			return RocketMQLocalTransactionState.ROLLBACK;
+					"executer: " + new String(msg.getBody()) + " rollback");
+			return LocalTransactionState.ROLLBACK_MESSAGE;
 		}
 		System.out.println(
-				"executer: " + new String((byte[]) msg.getPayload()) + " commit");
-		return RocketMQLocalTransactionState.COMMIT;
+				"executer: " + new String(msg.getBody()) + " commit");
+		return LocalTransactionState.COMMIT_MESSAGE;
 	}
 
 	@Override
-	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
-		System.out.println("check: " + new String((byte[]) msg.getPayload()));
-		return RocketMQLocalTransactionState.COMMIT;
+	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+		System.out.println("check: " + new String(msg.getBody()));
+		return LocalTransactionState.COMMIT_MESSAGE;
 	}
 
 }
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties
index 772bf456e..08ab26ca0 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties
@@ -11,6 +11,7 @@ spring.cloud.stream.bindings.output2.destination=TransactionTopic
 spring.cloud.stream.bindings.output2.content-type=application/json
 spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
 spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
+spring.cloud.stream.rocketmq.bindings.output2.producer.transactionListener=myTransactionListener
 
 spring.cloud.stream.bindings.output3.destination=pull-topic
 spring.cloud.stream.bindings.output3.content-type=text/plain
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
index e83a6a172..694c8519c 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java
@@ -18,18 +18,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.contants;
 
 import org.apache.rocketmq.common.message.MessageConst;
 
-import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
-
 /**
  * @author zkzlx
  */
 public class RocketMQConst extends MessageConst {
 
-	/**
-	 * Header key for RocketMQ Transactional Args.
-	 */
-	public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
-
 	/**
 	 * Default NameServer value.
 	 */
@@ -38,12 +31,8 @@ public class RocketMQConst extends MessageConst {
 	/**
 	 * Default group for SCS RocketMQ Binder.
 	 */
-	public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
+	public static final String DEFAULT_GROUP =  "binder_default_group_name";
 
-	/**
-	 * RocketMQ re-consume times.
-	 */
-	public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
 
 	public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
 
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
index 2ca91384b..a378fc615 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
@@ -17,6 +17,7 @@
 package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;
 
 import java.lang.reflect.Field;
+import java.util.Iterator;
 import java.util.List;
 
 import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
@@ -62,6 +63,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
 	private final MessageSelector messageSelector;
 	private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
 
+	private volatile Iterator<MessageExt> messageExtIterator=null;
+
 	public RocketMQMessageSource(String name,
 			ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
 		this.topic = name;
@@ -82,7 +85,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
 			this.consumer = RocketMQConsumerFactory
 					.initPullConsumer(extendedConsumerProperties);
 			// This parameter must be 1, otherwise doReceive cannot be handled singly.
-			this.consumer.setPullBatchSize(1);
+//			this.consumer.setPullBatchSize(1);
 			this.consumer.subscribe(topic, messageSelector);
 			this.consumer.setAutoCommit(false);
 			this.assignedMessageQueue = acquireAssignedMessageQueue(this.consumer);
@@ -132,11 +135,20 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
 
 	@Override
 	protected synchronized Object doReceive() {
-		List<MessageExt> messageExtList = consumer.poll();
-		if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
+		if(messageExtIterator == null){
+			List<MessageExt> messageExtList = consumer.poll();
+			if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
+				return null;
+			}
+			messageExtIterator = messageExtList.iterator();
+		}
+		MessageExt messageExt=messageExtIterator.next();
+		if(!messageExtIterator.hasNext()){
+			messageExtIterator = null;
+		}
+		if(null == messageExt){
 			return null;
 		}
-		MessageExt messageExt = messageExtList.get(0);
 		MessageQueue messageQueue = null;
 		for (MessageQueue queue : assignedMessageQueue.getAssignedMessageQueues()) {
 			if (queue.getQueueId() == messageExt.getQueueId()) {
@@ -144,8 +156,11 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
 				break;
 			}
 		}
+		if(messageQueue == null){
+			throw new IllegalArgumentException("The message queue is not in assigned list");
+		}
 		Message message = RocketMQMessageConverterSupport
-				.convertMessage2Spring(messageExtList.get(0));
+				.convertMessage2Spring(messageExt);
 		return MessageBuilder.fromMessage(message)
 				.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
 						new RocketMQAckCallback(this.consumer, assignedMessageQueue,
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java
index 66b58f797..3d58b8af0 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java
@@ -160,7 +160,7 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
 								TransactionListener.class);
 				if (transactionListener == null) {
 					throw new MessagingException(
-							"TransactionMQProducer must have a TransactionMQProducer !!! ");
+							"TransactionMQProducer must have a TransactionListener !!! ");
 				}
 				((TransactionMQProducer) defaultMQProducer)
 						.setTransactionListener(transactionListener);
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java
index 3f7898422..85165955d 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java
@@ -16,6 +16,7 @@
 
 package com.alibaba.cloud.stream.binder.rocketmq.utils;
 
+import com.alibaba.cloud.stream.binder.rocketmq.contants.RocketMQConst;
 import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
 import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQCommonProperties;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
@@ -81,7 +82,7 @@ public class RocketMQUtils {
 
 	public static String getNameServerStr(String nameServer) {
 		if (StringUtils.isEmpty(nameServer)) {
-			return null;
+			return RocketMQConst.DEFAULT_NAME_SERVER;
 		}
 		return nameServer.replaceAll(",", ";");
 	}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
index 8207f9893..02d0de9f9 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
@@ -18,7 +18,7 @@ package com.alibaba.cloud.stream.binder.rocketmq;
 
 import java.util.Arrays;
 
-import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
+import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
 import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
 import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
 import org.junit.Test;
@@ -37,20 +37,20 @@ public class RocketMQAutoConfigurationTests {
 			.withConfiguration(
 					AutoConfigurations.of(RocketMQBinderAutoConfiguration.class))
 			.withPropertyValues(
-					"spring.cloud.stream.rocketmq.binder.name-server[0]=127.0.0.1:9876",
-					"spring.cloud.stream.rocketmq.binder.name-server[1]=127.0.0.1:9877",
+					"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876,127.0.0.1:9877",
 					"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
 					"spring.cloud.stream.bindings.output.content-type=application/json",
+
 					"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
 					"spring.cloud.stream.bindings.input1.content-type=application/json",
 					"spring.cloud.stream.bindings.input1.group=test-group1",
-					"spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true",
+					"spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true",
 					"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
 					"spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
 					"spring.cloud.stream.bindings.input2.content-type=application/json",
 					"spring.cloud.stream.bindings.input2.group=test-group2",
-					"spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false",
-					"spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1");
+					"spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false",
+					"spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tag1");
 
 	@Test
 	public void testProperties() {
@@ -58,16 +58,16 @@ public class RocketMQAutoConfigurationTests {
 			RocketMQBinderConfigurationProperties binderConfigurationProperties = context
 					.getBean(RocketMQBinderConfigurationProperties.class);
 			assertThat(binderConfigurationProperties.getNameServer())
-					.isEqualTo(Arrays.asList("127.0.0.1:9876", "127.0.0.1:9877"));
+					.isEqualTo("127.0.0.1:9876,127.0.0.1:9877");
 			RocketMQExtendedBindingProperties bindingProperties = context
 					.getBean(RocketMQExtendedBindingProperties.class);
 			assertThat(
-					bindingProperties.getExtendedConsumerProperties("input2").getTags())
+					bindingProperties.getExtendedConsumerProperties("input2").getSubscription())
 							.isEqualTo("tag1");
-			assertThat(bindingProperties.getExtendedConsumerProperties("input2")
-					.getOrderly()).isFalse();
+			assertThat(bindingProperties.getExtendedConsumerProperties("input2").getPush().getOrderly()
+					).isFalse();
 			assertThat(bindingProperties.getExtendedConsumerProperties("input1")
-					.getOrderly()).isTrue();
+					.getPush().getOrderly()).isTrue();
 		});
 	}