From 2b25b1a338493fe8c2e9d431356b9ad6c87305bf Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Tue, 20 Nov 2018 14:21:31 +0800 Subject: [PATCH] add rocketmq starter and fix some small problems --- spring-cloud-alibaba-dependencies/pom.xml | 11 +++ spring-cloud-starter-alibaba/pom.xml | 1 + .../pom.xml | 20 ++++++ .../rocketmq/RocketMQBinderConstants.java | 24 +++---- .../RocketMQInboundChannelAdapter.java | 3 +- .../RocketMQAutoConfigurationTests.java | 72 +++++++++++++++++++ 6 files changed, 118 insertions(+), 13 deletions(-) create mode 100644 spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml create mode 100644 spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 6065685c5..07dd56440 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -183,6 +183,11 @@ spring-cloud-alicloud-context ${project.version} + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + ${project.version} + @@ -220,6 +225,12 @@ ${project.version} + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + ${project.version} + + io.dropwizard.metrics diff --git a/spring-cloud-starter-alibaba/pom.xml b/spring-cloud-starter-alibaba/pom.xml index 5223b1d61..c9bb0bb31 100644 --- a/spring-cloud-starter-alibaba/pom.xml +++ b/spring-cloud-starter-alibaba/pom.xml @@ -14,5 +14,6 @@ spring-cloud-starter-alibaba-nacos-config spring-cloud-starter-alibaba-nacos-discovery spring-cloud-starter-alibaba-sentinel + spring-cloud-starter-stream-rocketmq \ No newline at end of file diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml new file mode 100644 index 000000000..5b8ba75eb --- /dev/null +++ b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + + org.springframework.cloud + spring-cloud-starter-alibaba + 0.2.1.BUILD-SNAPSHOT + + spring-cloud-starter-stream-rocketmq + Spring Cloud Starter Stream RocketMQ + + + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + + + + diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 4086f7725..3ee0335c2 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -5,22 +5,22 @@ package org.springframework.cloud.stream.binder.rocketmq; */ public interface RocketMQBinderConstants { - /** - * Header key - */ - String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE"; + /** + * Header key + */ + String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE"; - String ROCKET_FLAG = "ROCKETMQ_FLAG"; + String ROCKET_FLAG = "ROCKETMQ_FLAG"; - String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; + String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; - String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; - /** - * Instrumentation key - */ - String LASTSEND_TIMESTAMP = "lastSend.timestamp"; + /** + * Instrumentation key + */ + String LASTSEND_TIMESTAMP = "lastSend.timestamp"; - String ENDPOINT_ID = "rocketmq-binder"; + String ENDPOINT_ID = "rocketmq-binder"; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index c4a4e8d90..8f3ddacb6 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -43,6 +43,8 @@ import org.springframework.util.StringUtils; */ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { + private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class); + private ConsumerInstrumentation consumerInstrumentation; private final ExtendedConsumerProperties consumerProperties; @@ -132,7 +134,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { } protected class CloudStreamMessageListener implements MessageListener, RetryListener { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final InstrumentationManager instrumentationManager; diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java new file mode 100644 index 000000000..9b1be23f5 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; +import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; + +/** + * @author Jim + */ +public class RocketMQAutoConfigurationTests { + + private ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class, + RocketMQBinderAutoConfiguration.class)) + .withPropertyValues( + "spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876", + "spring.cloud.stream.bindings.output.destination=TopicOrderTest", + "spring.cloud.stream.bindings.output.content-type=application/json", + "spring.cloud.stream.bindings.input1.destination=TopicOrderTest", + "spring.cloud.stream.bindings.input1.content-type=application/json", + "spring.cloud.stream.bindings.input1.group=test-group1", + "spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true", + "spring.cloud.stream.bindings.input1.consumer.maxAttempts=1", + "spring.cloud.stream.bindings.input2.destination=TopicOrderTest", + "spring.cloud.stream.bindings.input2.content-type=application/json", + "spring.cloud.stream.bindings.input2.group=test-group2", + "spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false", + "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1"); + + @Test + public void testProperties() { + this.contextRunner.run(context -> { + RocketMQBinderConfigurationProperties binderConfigurationProperties = context + .getBean(RocketMQBinderConfigurationProperties.class); + assertThat(binderConfigurationProperties.getNamesrvAddr()) + .isEqualTo("127.0.0.1:9876"); + RocketMQExtendedBindingProperties bindingProperties = context + .getBean(RocketMQExtendedBindingProperties.class); + assertThat( + bindingProperties.getExtendedConsumerProperties("input2").getTags()) + .isEqualTo("tag1"); + assertThat(bindingProperties.getExtendedConsumerProperties("input2") + .getOrderly()).isFalse(); + assertThat(bindingProperties.getExtendedConsumerProperties("input1") + .getOrderly()).isTrue(); + }); + } + +} \ No newline at end of file