From 3263d203c2e2cc99ef595c14efd0266abd97f23a Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Tue, 27 Nov 2018 11:36:29 +0800 Subject: [PATCH] add rocketmq-binder and starter --- pom.xml | 1 + spring-cloud-alibaba-dependencies/pom.xml | 26 ++ spring-cloud-starter-alibaba/pom.xml | 1 + .../pom.xml | 20 ++ spring-cloud-stream-binder-rocketmq/pom.xml | 71 ++++ .../rocketmq/RocketMQBinderConstants.java | 42 +++ .../RocketMQMessageChannelBinder.java | 117 +++++++ .../RocketMQMessageHeaderAccessor.java | 123 +++++++ .../actuator/RocketMQBinderEndpoint.java | 58 ++++ .../RocketMQBinderHealthIndicator.java | 66 ++++ .../RocketMQBinderAutoConfiguration.java | 70 ++++ ...cketMQBinderEndpointAutoConfiguration.java | 52 +++ .../rocketmq/consuming/Acknowledgement.java | 94 ++++++ .../rocketmq/consuming/ConsumersManager.java | 122 +++++++ .../RocketMQInboundChannelAdapter.java | 314 ++++++++++++++++++ .../integration/RocketMQMessageHandler.java | 160 +++++++++ .../metrics/ConsumerGroupInstrumentation.java | 33 ++ .../metrics/ConsumerInstrumentation.java | 53 +++ .../rocketmq/metrics/Instrumentation.java | 66 ++++ .../metrics/InstrumentationManager.java | 89 +++++ .../metrics/ProducerInstrumentation.java | 53 +++ ...RocketMQBinderConfigurationProperties.java | 48 +++ .../properties/RocketMQBindingProperties.java | 44 +++ .../RocketMQConsumerProperties.java | 95 ++++++ .../RocketMQExtendedBindingProperties.java | 80 +++++ .../RocketMQProducerProperties.java | 51 +++ .../RocketMQTopicProvisioner.java | 105 ++++++ .../main/resources/META-INF/spring.binders | 1 + .../main/resources/META-INF/spring.factories | 2 + .../RocketMQAutoConfigurationTests.java | 73 ++++ 30 files changed, 2130 insertions(+) create mode 100644 spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml create mode 100644 spring-cloud-stream-binder-rocketmq/pom.xml create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories create mode 100644 spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java diff --git a/pom.xml b/pom.xml index 70dc9d4a6..053e6e20a 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ spring-cloud-alibaba-sentinel-datasource spring-cloud-alibaba-nacos-config spring-cloud-alibaba-nacos-discovery + spring-cloud-stream-binder-rocketmq spring-cloud-alibaba-examples spring-cloud-alibaba-test spring-cloud-alibaba-docs diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index c89ae4889..38c62d125 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -24,6 +24,8 @@ 4.0.1 1.0.0 2.16.0 + 4.3.1 + 3.2.6 @@ -115,6 +117,11 @@ sentinel-dubbo-api ${project.version} + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + @@ -166,6 +173,11 @@ spring-cloud-alicloud-context ${project.version} + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + ${project.version} + @@ -202,6 +214,20 @@ spring-cloud-starter-alicloud-acm ${project.version} + + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + ${project.version} + + + + + io.dropwizard.metrics + metrics-core + ${metrics.core} + + diff --git a/spring-cloud-starter-alibaba/pom.xml b/spring-cloud-starter-alibaba/pom.xml index d455bf87c..09aa60bd4 100644 --- a/spring-cloud-starter-alibaba/pom.xml +++ b/spring-cloud-starter-alibaba/pom.xml @@ -17,6 +17,7 @@ spring-cloud-starter-alibaba-nacos-config spring-cloud-starter-alibaba-nacos-discovery spring-cloud-starter-alibaba-sentinel + spring-cloud-starter-stream-rocketmq diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml new file mode 100644 index 000000000..d4f2eedde --- /dev/null +++ b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + + + org.springframework.cloud + spring-cloud-starter-alibaba + 0.1.1.BUILD-SNAPSHOT + + spring-cloud-starter-stream-rocketmq + Spring Cloud Starter Stream RocketMQ + + + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + + + + diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml new file mode 100644 index 000000000..4b534d4b8 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -0,0 +1,71 @@ + + + + + org.springframework.cloud + spring-cloud-alibaba + 0.1.1.BUILD-SNAPSHOT + + 4.0.0 + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + Spring Cloud Alibaba RocketMQ Binder + + + + + org.springframework.cloud + spring-cloud-stream + + + + io.dropwizard.metrics + metrics-core + + + + org.apache.rocketmq + rocketmq-client + + + + org.springframework.boot + spring-boot-configuration-processor + provided + true + + + + org.springframework.boot + spring-boot + provided + true + + + + org.springframework.boot + spring-boot-autoconfigure + provided + true + + + + org.springframework.boot + spring-boot-actuator + provided + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java new file mode 100644 index 000000000..cb35b5a65 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +/** + * @author Jim + */ +public interface RocketMQBinderConstants { + + /** + * Header key + */ + String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE"; + + String ROCKET_FLAG = "ROCKETMQ_FLAG"; + + String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; + + String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; + + /** + * Instrumentation key + */ + String LASTSEND_TIMESTAMP = "lastSend.timestamp"; + + String ENDPOINT_ID = "rocketmq_binder"; + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java new file mode 100644 index 000000000..4bed86afc --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.core.MessageProducer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQMessageChannelBinder extends + AbstractMessageChannelBinder, + ExtendedProducerProperties, RocketMQTopicProvisioner> + implements ExtendedPropertiesBinder { + + private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class); + + private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQTopicProvisioner rocketTopicProvisioner; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final InstrumentationManager instrumentationManager; + private final ConsumersManager consumersManager; + + public RocketMQMessageChannelBinder(ConsumersManager consumersManager, + RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQTopicProvisioner provisioningProvider, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + InstrumentationManager instrumentationManager) { + super(true, null, provisioningProvider); + this.consumersManager = consumersManager; + this.extendedBindingProperties = extendedBindingProperties; + this.rocketTopicProvisioner = provisioningProvider; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.instrumentationManager = instrumentationManager; + } + + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties + producerProperties, + MessageChannel errorChannel) throws Exception { + if (producerProperties.getExtension().getEnabled()) { + return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(), + rocketBinderConfigurationProperties, instrumentationManager); + } else { + throw new RuntimeException( + "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered"); + } + } + + @Override + protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, + ExtendedConsumerProperties + consumerProperties) + throws Exception { + if (group == null || "".equals(group)) { + throw new RuntimeException("'group' must be configured for channel + " + destination.getName()); + } + + RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager, + consumerProperties, destination.getName(), group, instrumentationManager); + + ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, + consumerProperties); + if (consumerProperties.getMaxAttempts() > 1) { + rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties)); + rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer()); + } else { + rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel()); + } + + return rocketInboundChannelAdapter; + } + + @Override + public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { + return extendedBindingProperties.getExtendedConsumerProperties(channelName); + } + + @Override + public RocketMQProducerProperties getExtendedProducerProperties(String channelName) { + return extendedBindingProperties.getExtendedProducerProperties(channelName); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java new file mode 100644 index 000000000..fa480681a --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; +import org.springframework.integration.support.MutableMessage; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageHeaderAccessor; + +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { + + public RocketMQMessageHeaderAccessor() { + super(); + } + + public RocketMQMessageHeaderAccessor(Message message) { + super(message); + } + + public Acknowledgement getAcknowledgement(Message message) { + return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class); + } + + public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) { + setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment); + return this; + } + + public String getTags() { + return getMessageHeaders().get(MessageConst.PROPERTY_TAGS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_TAGS); + } + + public RocketMQMessageHeaderAccessor withTags(String tag) { + setHeader(MessageConst.PROPERTY_TAGS, tag); + return this; + } + + public String getKeys() { + return getMessageHeaders().get(MessageConst.PROPERTY_KEYS) == null ? "" : (String) getMessageHeaders().get(MessageConst.PROPERTY_KEYS); + } + + public RocketMQMessageHeaderAccessor withKeys(String keys) { + setHeader(MessageConst.PROPERTY_KEYS, keys); + return this; + } + + public MessageExt getRocketMessage() { + return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class); + } + + public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) { + setHeader(ORIGINAL_ROCKET_MESSAGE, message); + return this; + } + + public Integer getDelayTimeLevel() { + return NumberUtils.toInt((String)getMessageHeaders().get(MessageConst.PROPERTY_DELAY_TIME_LEVEL), 0); + } + + public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) { + setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); + return this; + } + + public Integer getFlag() { + return NumberUtils.toInt((String)getMessageHeaders().get(ROCKET_FLAG), 0); + } + + public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) { + setHeader(ROCKET_FLAG, delayTimeLevel); + return this; + } + + public SendResult getSendResult() { + return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); + } + + public static void putSendResult(MutableMessage message, SendResult sendResult) { + message.getHeaders().put(ROCKET_SEND_RESULT, sendResult); + } + + public Map getUserProperties() { + Map result = new HashMap<>(); + for (Map.Entry entry : this.toMap().entrySet()) { + if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry + .getKey().equals(MessageHeaders.CONTENT_TYPE)) { + result.put(entry.getKey(), (String)entry.getValue()); + } + } + return result; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java new file mode 100644 index 000000000..756924beb --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.actuator; + +import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.boot.actuate.endpoint.AbstractEndpoint; + +import com.codahale.metrics.MetricRegistry; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQBinderEndpoint extends AbstractEndpoint> { + + private MetricRegistry metricRegistry = new MetricRegistry(); + private Map runtime = new ConcurrentHashMap<>(); + + public RocketMQBinderEndpoint() { + super(ENDPOINT_ID); + } + + @Override + public Map invoke() { + Map result = new HashMap<>(); + result.put("metrics", metricRegistry().getMetrics()); + result.put("runtime", runtime()); + return result; + } + + public MetricRegistry metricRegistry() { + return metricRegistry; + } + + public Map runtime() { + return runtime; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java new file mode 100644 index 000000000..f23463acf --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.actuator; + +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { + + private final InstrumentationManager instrumentationManager; + + public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) { + this.instrumentationManager = instrumentationManager; + } + + @Override + protected void doHealthCheck(Health.Builder builder) throws Exception { + int upCount = 0, outOfServiceCount = 0; + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (instrumentation.isUp()) { + upCount++; + } + else if (instrumentation.isOutOfService()) { + upCount++; + } + } + if (upCount == instrumentationManager.getHealthInstrumentations().size()) { + builder.up(); + return; + } + else if (outOfServiceCount == instrumentationManager.getHealthInstrumentations() + .size()) { + builder.outOfService(); + return; + } + builder.down(); + + for (Instrumentation instrumentation : instrumentationManager + .getHealthInstrumentations()) { + if (!instrumentation.isStarted()) { + builder.withException(instrumentation.getStartException()); + } + } + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java new file mode 100644 index 000000000..f2bb68cff --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.config; + +import org.apache.rocketmq.client.log.ClientLogger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; +import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Timur Valiev + * @author Jim + */ +@Configuration +@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class}) +public class RocketMQBinderAutoConfiguration { + + private final RocketMQExtendedBindingProperties extendedBindingProperties; + + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + @Autowired + public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.extendedBindingProperties = extendedBindingProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel()); + } + + @Bean + public RocketMQTopicProvisioner provisioningProvider() { + return new RocketMQTopicProvisioner(); + } + + @Bean + public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, + InstrumentationManager instrumentationManager, + ConsumersManager consumersManager) { + RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties, + provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager); + return binder; + } + + @Bean + public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) { + return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java new file mode 100644 index 000000000..a53865096 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.config; + +import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; +import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Jim + */ +@Configuration +@AutoConfigureAfter(EndpointAutoConfiguration.class) +public class RocketMQBinderEndpointAutoConfiguration { + + @Bean + public RocketMQBinderEndpoint rocketBinderEndpoint() { + return new RocketMQBinderEndpoint(); + } + + @Bean + public RocketMQBinderHealthIndicator rocketBinderHealthIndicator( + InstrumentationManager instrumentationManager) { + return new RocketMQBinderHealthIndicator(instrumentationManager); + } + + @Bean + public InstrumentationManager instrumentationManager( + RocketMQBinderEndpoint rocketBinderEndpoint) { + return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), + rocketBinderEndpoint.runtime()); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java new file mode 100644 index 000000000..207dbe50d --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; + +/** + * @author Timur Valiev + * @author Jim + */ +public class Acknowledgement { + + /** + * for {@link ConsumeConcurrentlyContext} using + */ + private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + /** + * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency + */ + private Integer consumeConcurrentlyDelayLevel = 0; + + /** + * for {@link ConsumeOrderlyContext} using + */ + private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS; + private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L; + + public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) { + this.consumeConcurrentlyStatus = consumeConcurrentlyStatus; + return this; + } + + public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() { + return consumeConcurrentlyStatus; + } + + public ConsumeOrderlyStatus getConsumeOrderlyStatus() { + return consumeOrderlyStatus; + } + + public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) { + this.consumeOrderlyStatus = consumeOrderlyStatus; + return this; + } + + public Integer getConsumeConcurrentlyDelayLevel() { + return consumeConcurrentlyDelayLevel; + } + + public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) { + this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel; + } + + public Long getConsumeOrderlySuspendCurrentQueueTimeMill() { + return consumeOrderlySuspendCurrentQueueTimeMill; + } + + public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) { + this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill; + } + + public static Acknowledgement buildOrderlyInstance() { + Acknowledgement acknowledgement = new Acknowledgement(); + acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS); + return acknowledgement; + } + + public static Acknowledgement buildConcurrentlyInstance() { + Acknowledgement acknowledgement = new Acknowledgement(); + acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS); + return acknowledgement; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java new file mode 100644 index 000000000..6b9445f93 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; + +/** + * @author Timur Valiev + * @author Jim + */ +public class ConsumersManager { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final Map consumerGroups = new HashMap<>(); + private final Map started = new HashMap<>(); + private final Map, ExtendedConsumerProperties> propertiesMap + = new HashMap<>(); + private final InstrumentationManager instrumentationManager; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + public ConsumersManager(InstrumentationManager instrumentationManager, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { + this.instrumentationManager = instrumentationManager; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + } + + public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic, + ExtendedConsumerProperties consumerProperties) { + propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties); + ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group); + instrumentationManager.addHealthInstrumentation(instrumentation); + + if (consumerGroups.containsKey(group)) { + return consumerGroups.get(group); + } + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); + consumerGroups.put(group, consumer); + started.put(group, false); + consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); + consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); + if (consumerProperties.getExtension().getBroadcasting()) { + consumer.setMessageModel(MessageModel.BROADCASTING); + } + logger.info("RocketMQ consuming for SCS group {} created", group); + return consumer; + } + + public synchronized void startConsumers() throws MQClientException { + for (String group : getConsumerGroups()) { + start(group); + } + } + + public synchronized void startConsumer(String group) throws MQClientException { + start(group); + } + + public synchronized void stopConsumer(String group) { + stop(group); + } + + private void stop(String group) { + if (consumerGroups.get(group) != null) { + consumerGroups.get(group).shutdown(); + started.put(group, false); + } + } + + private synchronized void start(String group) throws MQClientException { + if (started.get(group)) { + return; + } + ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation( + group); + instrumentationManager.addHealthInstrumentation(groupInstrumentation); + try { + consumerGroups.get(group).start(); + started.put(group, true); + groupInstrumentation.markStartedSuccessfully(); + } catch (MQClientException e) { + groupInstrumentation.markStartFailed(e); + logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e); + throw e; + } + } + + public synchronized Set getConsumerGroups() { + return consumerGroups.keySet(); + } +} + diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java new file mode 100644 index 000000000..9d8194621 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -0,0 +1,314 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; +import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; +import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQInboundChannelAdapter extends MessageProducerSupport { + + private static final Logger logger = LoggerFactory + .getLogger(RocketMQInboundChannelAdapter.class); + + private ConsumerInstrumentation consumerInstrumentation; + + private final ExtendedConsumerProperties consumerProperties; + + private final String destination; + + private final String group; + + private final InstrumentationManager instrumentationManager; + + private final ConsumersManager consumersManager; + + private RetryTemplate retryTemplate; + + private RecoveryCallback recoveryCallback; + + public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, + ExtendedConsumerProperties consumerProperties, + String destination, String group, + InstrumentationManager instrumentationManager) { + this.consumersManager = consumersManager; + this.consumerProperties = consumerProperties; + this.destination = destination; + this.group = group; + this.instrumentationManager = instrumentationManager; + } + + @Override + protected void doStart() { + if (!consumerProperties.getExtension().getEnabled()) { + return; + } + + String tags = consumerProperties == null ? null + : consumerProperties.getExtension().getTags(); + Boolean isOrderly = consumerProperties == null ? false + : consumerProperties.getExtension().getOrderly(); + + DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, + destination, consumerProperties); + + final CloudStreamMessageListener listener = isOrderly + ? new CloudStreamMessageListenerOrderly(instrumentationManager) + : new CloudStreamMessageListenerConcurrently(instrumentationManager); + + if (retryTemplate != null) { + retryTemplate.registerListener(listener); + } + + Set tagsSet = new HashSet<>(); + if (!StringUtils.isEmpty(tags)) { + for (String tag : tags.split("\\|\\|")) { + tagsSet.add(tag.trim()); + } + } + + consumerInstrumentation = instrumentationManager + .getConsumerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(consumerInstrumentation); + + try { + if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { + consumer.subscribe(destination, MessageSelector + .bySql(consumerProperties.getExtension().getSql())); + } + else { + consumer.subscribe(destination, + org.apache.commons.lang3.StringUtils.join(tagsSet, " || ")); + } + consumerInstrumentation.markStartedSuccessfully(); + } + catch (MQClientException e) { + consumerInstrumentation.markStartFailed(e); + logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + + e.getErrorMessage(), e); + throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); + } + + consumer.registerMessageListener(listener); + + try { + consumersManager.startConsumer(group); + } + catch (MQClientException e) { + logger.error( + "RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), + e); + throw new RuntimeException("RocketMQ Consumer startup failed.", e); + } + } + + @Override + protected void doStop() { + consumersManager.stopConsumer(group); + } + + public void setRetryTemplate(RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; + } + + public void setRecoveryCallback(RecoveryCallback recoveryCallback) { + this.recoveryCallback = recoveryCallback; + } + + protected class CloudStreamMessageListener implements MessageListener, RetryListener { + + private final InstrumentationManager instrumentationManager; + + CloudStreamMessageListener(InstrumentationManager instrumentationManager) { + this.instrumentationManager = instrumentationManager; + } + + Acknowledgement consumeMessage(final List msgs) { + boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; + try { + if (enableRetry) { + return RocketMQInboundChannelAdapter.this.retryTemplate.execute( + // (RetryCallback) + new RetryCallback() { + @Override + public Acknowledgement doWithRetry(RetryContext context) + throws Exception { + return doSendMsgs(msgs, context); + } + }, new RecoveryCallback() { + @Override + public Acknowledgement recover(RetryContext context) + throws Exception { + RocketMQInboundChannelAdapter.this.recoveryCallback + .recover(context); + if (ClassUtils.isAssignable(this.getClass(), + MessageListenerConcurrently.class)) { + return Acknowledgement + .buildConcurrentlyInstance(); + } + else { + return Acknowledgement.buildOrderlyInstance(); + } + } + }); + } + else { + Acknowledgement result = doSendMsgs(msgs, null); + instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + return result; + } + } + catch (Exception e) { + logger.error( + "Rocket Message hasn't been processed successfully. Caused by ", + e); + instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + throw new RuntimeException( + "Rocket Message hasn't been processed successfully. Caused by ", + e); + } + } + + private Acknowledgement doSendMsgs(final List msgs, + RetryContext context) { + List acknowledgements = new ArrayList<>(); + for (MessageExt msg : msgs) { + String retryInfo = context == null ? "" + : "retryCount-" + String.valueOf(context.getRetryCount()) + "|"; + logger.debug(retryInfo + "consuming msg:\n" + msg); + logger.debug(retryInfo + "message body:\n" + new String(msg.getBody())); + Acknowledgement acknowledgement = new Acknowledgement(); + Message toChannel = MessageBuilder.withPayload(msg.getBody()) + .setHeaders(new RocketMQMessageHeaderAccessor() + .withAcknowledgment(acknowledgement) + .withTags(msg.getTags()).withKeys(msg.getKeys()) + .withFlag(msg.getFlag()).withRocketMessage(msg)) + .build(); + acknowledgements.add(acknowledgement); + RocketMQInboundChannelAdapter.this.sendMessage(toChannel); + } + return acknowledgements.get(0); + } + + @Override + public boolean open(RetryContext context, + RetryCallback callback) { + return true; + } + + @Override + public void close(RetryContext context, + RetryCallback callback, Throwable throwable) { + if (throwable != null) { + instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumedFailure(); + } + else { + instrumentationManager + .getConsumerInstrumentation( + RocketMQInboundChannelAdapter.this.destination) + .markConsumed(); + } + } + + @Override + public void onError(RetryContext context, + RetryCallback callback, Throwable throwable) { + } + } + + protected class CloudStreamMessageListenerConcurrently + extends CloudStreamMessageListener implements MessageListenerConcurrently { + + public CloudStreamMessageListenerConcurrently( + InstrumentationManager instrumentationManager) { + super(instrumentationManager); + } + + @Override + public ConsumeConcurrentlyStatus consumeMessage(final List msgs, + ConsumeConcurrentlyContext context) { + Acknowledgement acknowledgement = consumeMessage(msgs); + context.setDelayLevelWhenNextConsume( + acknowledgement.getConsumeConcurrentlyDelayLevel()); + return acknowledgement.getConsumeConcurrentlyStatus(); + } + } + + protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener + implements MessageListenerOrderly { + + public CloudStreamMessageListenerOrderly( + InstrumentationManager instrumentationManager) { + super(instrumentationManager); + } + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { + Acknowledgement acknowledgement = consumeMessage(msgs); + context.setSuspendCurrentQueueTimeMillis( + (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); + return acknowledgement.getConsumeOrderlyStatus(); + } + + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java new file mode 100644 index 000000000..88273e8c1 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.integration; + +import java.util.Map; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; +import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import org.springframework.context.Lifecycle; +import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.support.MutableMessage; +import org.springframework.messaging.MessagingException; + +/** + * @author Jim + */ +public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { + + private DefaultMQProducer producer; + + private ProducerInstrumentation producerInstrumentation; + + private final RocketMQProducerProperties producerProperties; + + private final String destination; + + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + private final InstrumentationManager instrumentationManager; + + protected volatile boolean running = false; + + public RocketMQMessageHandler(String destination, + RocketMQProducerProperties producerProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + InstrumentationManager instrumentationManager) { + this.destination = destination; + this.producerProperties = producerProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.instrumentationManager = instrumentationManager; + } + + @Override + public void start() { + producer = new DefaultMQProducer(destination); + + producerInstrumentation = instrumentationManager + .getProducerInstrumentation(destination); + instrumentationManager.addHealthInstrumentation(producerInstrumentation); + + producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); + + if (producerProperties.getMaxMessageSize() > 0) { + producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); + } + + try { + producer.start(); + producerInstrumentation.markStartedSuccessfully(); + } + catch (MQClientException e) { + producerInstrumentation.markStartFailed(e); + logger.error( + "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); + throw new MessagingException(e.getMessage(), e); + } + running = true; + } + + @Override + public void stop() { + if (producer != null) { + producer.shutdown(); + } + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + protected void handleMessageInternal(org.springframework.messaging.Message message) + throws Exception { + try { + Message toSend; + if (message.getPayload() instanceof byte[]) { + toSend = new Message(destination, (byte[]) message.getPayload()); + } + else if (message.getPayload() instanceof String) { + toSend = new Message(destination, + ((String) message.getPayload()).getBytes()); + } + else { + throw new UnsupportedOperationException("Payload class isn't supported: " + + message.getPayload().getClass()); + } + RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor( + message); + headerAccessor.setLeaveMutable(true); + toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel()); + toSend.setTags(headerAccessor.getTags()); + toSend.setKeys(headerAccessor.getKeys()); + toSend.setFlag(headerAccessor.getFlag()); + for (Map.Entry entry : headerAccessor.getUserProperties() + .entrySet()) { + toSend.putUserProperty(entry.getKey(), entry.getValue()); + } + + SendResult sendRes = producer.send(toSend); + + if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { + throw new MQClientException("message hasn't been sent", null); + } + if (message instanceof MutableMessage) { + RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, + sendRes); + } + instrumentationManager.getRuntime().put( + RocketMQBinderConstants.LASTSEND_TIMESTAMP, + System.currentTimeMillis()); + producerInstrumentation.markSent(); + } + catch (MQClientException | RemotingException | MQBrokerException + | InterruptedException | UnsupportedOperationException e) { + producerInstrumentation.markSentFailure(); + logger.error( + "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); + throw new MessagingException(e.getMessage(), e); + } + + } + +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java new file mode 100644 index 000000000..6e4386d00 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import com.codahale.metrics.MetricRegistry; + +/** + * @author Timur Valiev + * @author Jim + */ +public class ConsumerGroupInstrumentation extends Instrumentation { + private MetricRegistry metricRegistry; + + public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { + super(name); + this.metricRegistry = metricRegistry; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java new file mode 100644 index 000000000..e773729f6 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * @author juven.xuxb + * @author Jim + */ +public class ConsumerInstrumentation extends Instrumentation { + + private final Counter totalConsumed; + private final Counter totalConsumedFailures; + private final Meter consumedPerSecond; + private final Meter consumedFailuresPerSecond; + + public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { + super(baseMetricName); + this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed")); + this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond")); + this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures")); + this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond")); + } + + public void markConsumed() { + totalConsumed.inc(); + consumedPerSecond.mark(); + } + + public void markConsumedFailure() { + totalConsumedFailures.inc(); + consumedFailuresPerSecond.mark(); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java new file mode 100644 index 000000000..c169ba83e --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Timur Valiev + * @author Jim + */ +public class Instrumentation { + private final String name; + protected final AtomicBoolean started = new AtomicBoolean(false); + protected Exception startException = null; + + Instrumentation(String name) { + this.name = name; + } + + public boolean isDown() { + return startException != null; + } + + public boolean isUp() { + return started.get(); + } + + public boolean isOutOfService() { + return !started.get() && startException == null; + } + + public void markStartedSuccessfully() { + started.set(true); + } + + public void markStartFailed(Exception e) { + started.set(false); + startException = e; + } + + public String getName() { + return name; + } + + public boolean isStarted() { + return started.get(); + } + + public Exception getStartException() { + return startException; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java new file mode 100644 index 000000000..dea848590 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.codahale.metrics.MetricRegistry; + +/** + * @author Timur Valiev + * @author Jim + */ +public class InstrumentationManager { + private final MetricRegistry metricRegistry; + private final Map runtime; + private final Map producerInstrumentations = new HashMap<>(); + private final Map consumeInstrumentations = new HashMap<>(); + private final Map consumerGroupsInstrumentations = new HashMap<>(); + + private final Map healthInstrumentations = new HashMap<>(); + + public InstrumentationManager(MetricRegistry metricRegistry, + Map runtime) { + this.metricRegistry = metricRegistry; + this.runtime = runtime; + } + + public ProducerInstrumentation getProducerInstrumentation(String destination) { + String key = "scs-rocketmq.producer." + destination; + ProducerInstrumentation producerInstrumentation = producerInstrumentations + .get(key); + if (producerInstrumentation == null) { + producerInstrumentations.put(key, + new ProducerInstrumentation(metricRegistry, key)); + } + return producerInstrumentations.get(key); + } + + public ConsumerInstrumentation getConsumerInstrumentation(String destination) { + String key = "scs-rocketmq.consumer." + destination; + ConsumerInstrumentation consumerInstrumentation = consumeInstrumentations + .get(key); + if (consumerInstrumentation == null) { + consumeInstrumentations.put(key, + new ConsumerInstrumentation(metricRegistry, key)); + } + return consumeInstrumentations.get(key); + } + + public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { + String key = "scs-rocketmq.consumerGroup." + group; + ConsumerGroupInstrumentation consumerGroupInstrumentation = consumerGroupsInstrumentations + .get(key); + if (consumerGroupInstrumentation == null) { + consumerGroupsInstrumentations.put(key, + new ConsumerGroupInstrumentation(metricRegistry, key)); + } + return consumerGroupsInstrumentations.get(key); + } + + public Set getHealthInstrumentations() { + return new HashSet<>(healthInstrumentations.values()); + } + + public void addHealthInstrumentation(Instrumentation instrumentation) { + healthInstrumentations.put(instrumentation.getName(), instrumentation); + } + + public Map getRuntime() { + return runtime; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java new file mode 100644 index 000000000..6fc6daf98 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * @author juven.xuxb + * @author Jim + */ +public class ProducerInstrumentation extends Instrumentation { + + private final Counter totalSent; + private final Counter totalSentFailures; + private final Meter sentPerSecond; + private final Meter sentFailuresPerSecond; + + public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { + super(baseMetricName); + this.totalSent = registry.counter(name(baseMetricName, "totalSent")); + this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures")); + this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond")); + this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond")); + } + + public void markSent() { + totalSent.inc(); + sentPerSecond.mark(); + } + + public void markSentFailure() { + totalSentFailures.inc(); + sentFailuresPerSecond.mark(); + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java new file mode 100644 index 000000000..f2c028ec4 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author Timur Valiev + * @author Jim + */ +@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") +public class RocketMQBinderConfigurationProperties { + + private String namesrvAddr = "127.0.0.1:9876"; + + private String logLevel = "ERROR"; + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java new file mode 100644 index 000000000..b3c2f194f --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.properties; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQBindingProperties { + + private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties(); + + private RocketMQProducerProperties producer = new RocketMQProducerProperties(); + + public RocketMQConsumerProperties getConsumer() { + return consumer; + } + + public void setConsumer(RocketMQConsumerProperties consumer) { + this.consumer = consumer; + } + + public RocketMQProducerProperties getProducer() { + return producer; + } + + public void setProducer(RocketMQProducerProperties producer) { + this.producer = producer; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java new file mode 100644 index 000000000..7f757586b --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQConsumerProperties { + + /** + * using '||' to split tag + * {@link MQPushConsumer#subscribe(String, String)} + */ + private String tags; + + /** + * {@link MQPushConsumer#subscribe(String, MessageSelector)} + * {@link MessageSelector#bySql(String)} + */ + private String sql; + + /** + * {@link MessageModel#BROADCASTING} + */ + private Boolean broadcasting = false; + + /** + * if orderly is true, using {@link MessageListenerOrderly} + * else if orderly if false, using {@link MessageListenerConcurrently} + */ + private Boolean orderly = false; + + private Boolean enabled = true; + + public String getTags() { + return tags; + } + + public void setTags(String tags) { + this.tags = tags; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public Boolean getOrderly() { + return orderly; + } + + public void setOrderly(Boolean orderly) { + this.orderly = orderly; + } + + public Boolean getEnabled() { + return enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Boolean getBroadcasting() { + return broadcasting; + } + + public void setBroadcasting(Boolean broadcasting) { + this.broadcasting = broadcasting; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java new file mode 100644 index 000000000..58d9d8ef7 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.stream.binder.ExtendedBindingProperties; + +/** + * @author Timur Valiev + * @author Jim + */ +@ConfigurationProperties("spring.cloud.stream.rocketmq") +public class RocketMQExtendedBindingProperties implements + ExtendedBindingProperties { + + private Map bindings = new HashMap<>(); + + public Map getBindings() { + return this.bindings; + } + + public void setBindings(Map bindings) { + this.bindings = bindings; + } + + @Override + public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { + if (bindings.containsKey(channelName)) { + if (bindings.get(channelName).getConsumer() != null) { + return bindings.get(channelName).getConsumer(); + } else { + RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); + this.bindings.get(channelName).setConsumer(properties); + return properties; + } + } else { + RocketMQConsumerProperties properties = new RocketMQConsumerProperties(); + RocketMQBindingProperties rbp = new RocketMQBindingProperties(); + rbp.setConsumer(properties); + bindings.put(channelName, rbp); + return properties; + } + } + + @Override + public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) { + if (bindings.containsKey(channelName)) { + if (bindings.get(channelName).getProducer() != null) { + return bindings.get(channelName).getProducer(); + } else { + RocketMQProducerProperties properties = new RocketMQProducerProperties(); + this.bindings.get(channelName).setProducer(properties); + return properties; + } + } else { + RocketMQProducerProperties properties = new RocketMQProducerProperties(); + RocketMQBindingProperties rbp = new RocketMQBindingProperties(); + rbp.setProducer(properties); + bindings.put(channelName, rbp); + return properties; + } + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java new file mode 100644 index 000000000..8c2528344 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.properties; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQProducerProperties { + + private Boolean enabled = true; + + /** + * Maximum allowed message size in bytes + * {@link DefaultMQProducer#maxMessageSize} + */ + private Integer maxMessageSize = 0; + + public Boolean getEnabled() { + return enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Integer getMaxMessageSize() { + return maxMessageSize; + } + + public void setMaxMessageSize(Integer maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java new file mode 100644 index 000000000..17bf56ac0 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.provisioning; + +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.provisioning.ProvisioningException; +import org.springframework.cloud.stream.provisioning.ProvisioningProvider; + +/** + * @author Timur Valiev + * @author Jim + */ +public class RocketMQTopicProvisioner + implements + ProvisioningProvider, + ExtendedProducerProperties> { + + private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class); + + @Override + public ProducerDestination provisionProducerDestination(String name, + ExtendedProducerProperties + properties) + throws ProvisioningException { + checkTopic(name); + return new RocketProducerDestination(name); + } + + @Override + public ConsumerDestination provisionConsumerDestination(String name, String group, + ExtendedConsumerProperties + properties) + throws ProvisioningException { + checkTopic(name); + return new RocketConsumerDestination(name); + } + + private void checkTopic(String topic) { + try { + Validators.checkTopic(topic); + } catch (MQClientException e) { + logger.error("topic check error: " + topic, e); + throw new AssertionError(e); // Can't happen + } + } + + private static final class RocketProducerDestination implements ProducerDestination { + + private final String producerDestinationName; + + RocketProducerDestination(String destinationName) { + this.producerDestinationName = destinationName; + } + + @Override + public String getName() { + return producerDestinationName; + } + + @Override + public String getNameForPartition(int partition) { + return producerDestinationName; + } + + } + + private static final class RocketConsumerDestination implements ConsumerDestination { + + private final String consumerDestinationName; + + RocketConsumerDestination(String consumerDestinationName) { + this.consumerDestinationName = consumerDestinationName; + } + + @Override + public String getName() { + return this.consumerDestinationName; + } + + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders new file mode 100644 index 000000000..78d86cf92 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders @@ -0,0 +1 @@ +rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..43b85129a --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java new file mode 100644 index 000000000..3328bb648 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; +import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import org.springframework.context.ConfigurableApplicationContext; + +/** + * @author Jim + */ +public class RocketMQAutoConfigurationTests { + + private ConfigurableApplicationContext context; + + @Before + public void setUp() throws Exception { + this.context = new SpringApplicationBuilder( + RocketMQBinderEndpointAutoConfiguration.class, + RocketMQBinderAutoConfiguration.class).web(false).run( + "--spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876", + "--spring.cloud.stream.bindings.output.destination=TopicOrderTest", + "--spring.cloud.stream.bindings.output.content-type=application/json", + "--spring.cloud.stream.bindings.input1.destination=TopicOrderTest", + "--spring.cloud.stream.bindings.input1.content-type=application/json", + "--spring.cloud.stream.bindings.input1.group=test-group1", + "--spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true", + "--spring.cloud.stream.bindings.input1.consumer.maxAttempts=1", + "--spring.cloud.stream.bindings.input2.destination=TopicOrderTest", + "--spring.cloud.stream.bindings.input2.content-type=application/json", + "--spring.cloud.stream.bindings.input2.group=test-group2", + "--spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false", + "--spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1"); + } + + @Test + public void testProperties() { + RocketMQBinderConfigurationProperties binderConfigurationProperties = context + .getBean(RocketMQBinderConfigurationProperties.class); + assertThat(binderConfigurationProperties.getNamesrvAddr()) + .isEqualTo("127.0.0.1:9876"); + RocketMQExtendedBindingProperties bindingProperties = context + .getBean(RocketMQExtendedBindingProperties.class); + assertThat(bindingProperties.getExtendedConsumerProperties("input2").getTags()) + .isEqualTo("tag1"); + assertThat(bindingProperties.getExtendedConsumerProperties("input2").getOrderly()) + .isFalse(); + assertThat(bindingProperties.getExtendedConsumerProperties("input1").getOrderly()) + .isTrue(); + } + +} \ No newline at end of file