diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java index 1cc1b3290..446570ec9 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/TransactionListenerImpl.java @@ -16,11 +16,11 @@ package com.alibaba.cloud.examples; - import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; + import org.springframework.stereotype.Component; /** @@ -30,22 +30,18 @@ import org.springframework.stereotype.Component; public class TransactionListenerImpl implements TransactionListener { @Override - public LocalTransactionState executeLocalTransaction(Message msg, - Object arg) { + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Object num = msg.getProperty("test"); if ("1".equals(num)) { - System.out.println( - "executer: " + new String(msg.getBody()) + " unknown"); + System.out.println("executer: " + new String(msg.getBody()) + " unknown"); return LocalTransactionState.UNKNOW; } else if ("2".equals(num)) { - System.out.println( - "executer: " + new String(msg.getBody()) + " rollback"); + System.out.println("executer: " + new String(msg.getBody()) + " rollback"); return LocalTransactionState.ROLLBACK_MESSAGE; } - System.out.println( - "executer: " + new String(msg.getBody()) + " commit"); + System.out.println("executer: " + new String(msg.getBody()) + " commit"); return LocalTransactionState.COMMIT_MESSAGE; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 5683bff68..403f51476 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -60,6 +60,7 @@ public class RocketMQMessageChannelBinder extends ExtendedPropertiesBinder { private final RocketMQExtendedBindingProperties extendedBindingProperties; + private final RocketMQBinderConfigurationProperties binderConfigurationProperties; public RocketMQMessageChannelBinder( @@ -175,7 +176,6 @@ public class RocketMQMessageChannelBinder extends /** * Binders can return an {@link ErrorMessageStrategy} for building error messages; * binder implementations typically might add extra headers to the error message. - * * @return the implementation - may be null. */ @Override @@ -203,4 +203,5 @@ public class RocketMQMessageChannelBinder extends public Class getExtendedPropertiesEntryClass() { return this.extendedBindingProperties.getExtendedPropertiesEntryClass(); } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java index abcb9b961..ec47e951a 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java @@ -34,7 +34,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.CompositeMessageConverter; /** - * issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681 + * issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681 . + * * @author Timur Valiev * @author Jim */ @@ -45,6 +46,7 @@ public class RocketMQBinderAutoConfiguration { @Autowired private RocketMQExtendedBindingProperties extendedBindingProperties; + @Autowired private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java index 694c8519c..4f893642e 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,9 +31,11 @@ public class RocketMQConst extends MessageConst { /** * Default group for SCS RocketMQ Binder. */ - public static final String DEFAULT_GROUP = "binder_default_group_name"; - + public static final String DEFAULT_GROUP = "binder_default_group_name"; + /** + * user args for SCS RocketMQ Binder. + */ public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS"; /** @@ -41,19 +43,34 @@ public class RocketMQConst extends MessageConst { * and parameters are passed through HEADERS. */ public static class Headers { + + /** + * keys for SCS RocketMQ Headers. + */ public static final String KEYS = MessageConst.PROPERTY_KEYS; + + /** + * tags for SCS RocketMQ Headers. + */ public static final String TAGS = MessageConst.PROPERTY_TAGS; + + /** + * topic for SCS RocketMQ Headers. + */ public static final String TOPIC = "MQ_TOPIC"; + /** * The ID of the message. */ public static final String MESSAGE_ID = "MQ_MESSAGE_ID"; + /** * The timestamp that the message producer invokes the message sending API. */ public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP"; + /** - * The IP and port number of the message producer + * The IP and port number of the message producer. */ public static final String BORN_HOST = "MQ_BORN_HOST"; @@ -61,19 +78,23 @@ public class RocketMQConst extends MessageConst { * Message flag, MQ is not processed and is available for use by applications. */ public static final String FLAG = "MQ_FLAG"; + /** - * Message consumption queue ID + * Message consumption queue ID. */ public static final String QUEUE_ID = "MQ_QUEUE_ID"; + /** * Message system Flag, such as whether or not to compress, whether or not to * transactional messages. */ public static final String SYS_FLAG = "MQ_SYS_FLAG"; + /** * The transaction ID of the transaction message. */ public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID"; + } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java index 58f17c6ef..98bd03263 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,14 +27,19 @@ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.util.ClassUtils; /** - * The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME} + * The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME} . + * * @author zkzlx */ public class RocketMQMessageConverter { + /** + * rocketMQMessageConverter. + */ public static final String DEFAULT_NAME = "rocketMQMessageConverter"; private static final boolean JACKSON_PRESENT; + private static final boolean FASTJSON_PRESENT; static { @@ -81,4 +86,5 @@ public class RocketMQMessageConverter { public void setMessageConverter(CompositeMessageConverter messageConverter) { this.messageConverter = messageConverter; } -} \ No newline at end of file + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java index 9afc94821..21c62aede 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,12 +32,15 @@ import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.util.StringUtils; /** - * Gets the beans configured in the configuration file + * Gets the beans configured in the configuration file. * * @author junboXiang */ public final class RocketMQBeanContainerCache { + private RocketMQBeanContainerCache() { + } + private static final Class[] CLASSES = new Class[] { CompositeMessageConverter.class, AllocateMessageQueueStrategy.class, MessageQueueSelector.class, MessageListener.class, TransactionListener.class, diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java index 30bd64328..a83fdbe08 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,7 +22,7 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; /** - * find RocketMQ bean by annotations + * find RocketMQ bean by annotations. * * @author junboXiang * diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java index fcf6de804..dfe65fd94 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,7 +27,7 @@ public interface ErrorAcknowledgeHandler { /** * Ack state handling, including receive, reject, and retry, when a consumption * exception occurs. - * @param message + * @param message message * @return see {@link Status} */ Status handler(Message message); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 876249581..7f7e1bf26 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -41,6 +41,9 @@ import org.springframework.util.StringUtils; */ public final class RocketMQConsumerFactory { + private RocketMQConsumerFactory() { + } + private final static Logger log = LoggerFactory .getLogger(RocketMQConsumerFactory.class); @@ -91,7 +94,8 @@ public final class RocketMQConsumerFactory { /** * todo Compatible with versions less than 4.6 ? - * @return + * @param extendedConsumerProperties extendedConsumerProperties + * @return DefaultLitePullConsumer */ public static DefaultLitePullConsumer initPullConsumer( ExtendedConsumerProperties extendedConsumerProperties) { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java index d0aec5523..61c0eedd3 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java @@ -48,7 +48,6 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; /** - * TODO Describe what it does * @author Jim */ public class RocketMQInboundChannelAdapter extends MessageProducerSupport @@ -58,10 +57,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport .getLogger(RocketMQInboundChannelAdapter.class); private RetryTemplate retryTemplate; + private RecoveryCallback recoveryCallback; + private DefaultMQPushConsumer pushConsumer; private final String topic; + private final ExtendedConsumerProperties extendedConsumerProperties; public RocketMQInboundChannelAdapter(String topic, @@ -146,11 +148,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport * The actual execution of a user-defined input consumption service method. * @param messageExtList rocket mq message list * @param failSupplier {@link ConsumeConcurrentlyStatus} or - * {@link ConsumeOrderlyStatus} + * {@link ConsumeOrderlyStatus} * @param sucSupplier {@link ConsumeConcurrentlyStatus} or - * {@link ConsumeOrderlyStatus} - * @param - * @return + * {@link ConsumeOrderlyStatus} + * @param object + * @return R */ private R consumeMessage(List messageExtList, Supplier failSupplier, Supplier sucSupplier) { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java index 3296a128e..458324741 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/DefaultErrorAcknowledgeHandler.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -29,15 +29,16 @@ import org.springframework.messaging.Message; * @author zkzlx */ public class DefaultErrorAcknowledgeHandler implements ErrorAcknowledgeHandler { + /** * Ack state handling, including receive, reject, and retry, when a consumption * exception occurs. - * - * @param message + * @param message message * @return see {@link Status} */ @Override public Status handler(Message message) { return Status.REQUEUE; } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java index f617dd097..56e50ca4a 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQAckCallback.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,16 +31,23 @@ import org.springframework.util.Assert; /** * A pollable {@link org.springframework.integration.core.MessageSource} for RocketMQ. + * * @author zkzlx */ public class RocketMQAckCallback implements AcknowledgmentCallback { + private final static Logger log = LoggerFactory.getLogger(RocketMQAckCallback.class); private boolean acknowledged; + private boolean autoAckEnabled = true; + private MessageExt messageExt; + private AssignedMessageQueue assignedMessageQueue; + private DefaultLitePullConsumer consumer; + private final MessageQueue messageQueue; public RocketMQAckCallback(DefaultLitePullConsumer consumer, @@ -109,4 +116,4 @@ public class RocketMQAckCallback implements AcknowledgmentCallback { } } -} \ No newline at end of file +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java index a378fc615..a7d7e73da 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -56,14 +56,18 @@ public class RocketMQMessageSource extends AbstractMessageSource .getLogger(RocketMQMessageSource.class); private DefaultLitePullConsumer consumer; + private AssignedMessageQueue assignedMessageQueue; + private volatile boolean running; private final String topic; + private final MessageSelector messageSelector; + private final ExtendedConsumerProperties extendedConsumerProperties; - private volatile Iterator messageExtIterator=null; + private volatile Iterator messageExtIterator = null; public RocketMQMessageSource(String name, ExtendedConsumerProperties extendedConsumerProperties) { @@ -85,7 +89,7 @@ public class RocketMQMessageSource extends AbstractMessageSource this.consumer = RocketMQConsumerFactory .initPullConsumer(extendedConsumerProperties); // This parameter must be 1, otherwise doReceive cannot be handled singly. -// this.consumer.setPullBatchSize(1); + // this.consumer.setPullBatchSize(1); this.consumer.subscribe(topic, messageSelector); this.consumer.setAutoCommit(false); this.assignedMessageQueue = acquireAssignedMessageQueue(this.consumer); @@ -135,18 +139,18 @@ public class RocketMQMessageSource extends AbstractMessageSource @Override protected synchronized Object doReceive() { - if(messageExtIterator == null){ + if (messageExtIterator == null) { List messageExtList = consumer.poll(); if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) { return null; } messageExtIterator = messageExtList.iterator(); } - MessageExt messageExt=messageExtIterator.next(); - if(!messageExtIterator.hasNext()){ + MessageExt messageExt = messageExtIterator.next(); + if (!messageExtIterator.hasNext()) { messageExtIterator = null; } - if(null == messageExt){ + if (null == messageExt) { return null; } MessageQueue messageQueue = null; @@ -156,8 +160,9 @@ public class RocketMQMessageSource extends AbstractMessageSource break; } } - if(messageQueue == null){ - throw new IllegalArgumentException("The message queue is not in assigned list"); + if (messageQueue == null) { + throw new IllegalArgumentException( + "The message queue is not in assigned list"); } Message message = RocketMQMessageConverterSupport .convertMessage2Spring(messageExt); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java index 7017ba46e..463868438 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -39,18 +39,23 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** - * Extended function related to producer . eg:initial + * Extended function related to producer . eg:initial . * * @author zkzlx */ public final class RocketMQProduceFactory { + private RocketMQProduceFactory() { + } + private final static Logger log = LoggerFactory .getLogger(RocketMQProduceFactory.class); /** * init for the producer,including convert producer params. - * @return + * @param topic topic + * @param producerProperties producerProperties + * @return DefaultMQProducer */ public static DefaultMQProducer initRocketMQProducer(String topic, RocketMQProducerProperties producerProperties) { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java index 3d58b8af0..56c208759 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -61,16 +61,23 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler .getLogger(RocketMQProducerMessageHandler.class); private volatile boolean running = false; + private volatile boolean isTrans = false; private ErrorMessageStrategy errorMessageStrategy; + private MessageChannel sendFailureChannel; + private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor; + private DefaultMQProducer defaultMQProducer; + private MessageQueueSelector messageQueueSelector; private final ProducerDestination destination; + private final ExtendedProducerProperties extendedProducerProperties; + private final RocketMQProducerProperties mqProducerProperties; public RocketMQProducerMessageHandler(ProducerDestination destination, @@ -93,10 +100,8 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler // Use the default if the partition is on and no customization is available. this.messageQueueSelector = RocketMQBeanContainerCache.getBean( mqProducerProperties.getMessageQueueSelector(), - MessageQueueSelector.class, - extendedProducerProperties.isPartitioned() - ? new PartitionMessageQueueSelector() - : null); + MessageQueueSelector.class, extendedProducerProperties.isPartitioned() + ? new PartitionMessageQueueSelector() : null); } @Override @@ -226,9 +231,9 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler } /** - * https://github.com/alibaba/spring-cloud-alibaba/issues/1408 - * @param message - * @return + * https://github.com/alibaba/spring-cloud-alibaba/issues/1408 . + * @param message message + * @return SendCallback */ private SendCallback getSendCallback(Message message) { SendCallback sendCallback = RocketMQBeanContainerCache @@ -283,4 +288,5 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler this.partitioningInterceptor = partitioningInterceptor; return this; } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/Instrumentation.java index 397e62854..e26482855 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/Instrumentation.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/Instrumentation.java @@ -28,6 +28,7 @@ import org.springframework.context.Lifecycle; public class Instrumentation { private final String name; + private Lifecycle actuator; protected final AtomicBoolean started = new AtomicBoolean(false); @@ -88,4 +89,17 @@ public class Instrumentation { public int hashCode() { return Objects.hash(getName(), getActuator()); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Instrumentation that = (Instrumentation) o; + return name.equals(that.name) && actuator.equals(that.actuator); + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java index de6e1e794..ad7958e44 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -26,6 +26,9 @@ import java.util.Map; */ public final class InstrumentationManager { + private InstrumentationManager() { + } + private static final Map HEALTH_INSTRUMENTATIONS = new HashMap<>(); public static Collection getHealthInstrumentations() { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index fd5e77919..3da04ea18 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,6 +20,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** * binding rocketMq properties. + * * @author Jim */ @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java index 00e7d30dd..9e9114641 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,6 +26,7 @@ import org.apache.rocketmq.remoting.netty.TlsSystemConfig; * @author zkzlx */ public class RocketMQCommonProperties implements Serializable { + private static final long serialVersionUID = -6724870154343284715L; private boolean enabled = true; @@ -41,38 +42,36 @@ public class RocketMQCommonProperties implements Serializable { * The property of "secret-key". */ private String secretKey; + /** * Consumers of the same role is required to have exactly same subscriptions and * consumerGroup to correctly achieve load balance. It's required and needs to be - * globally unique. - *

- * Producer group conceptually aggregates all producer instances of exactly same role, - * which is particularly important when transactional messages are involved. - *

- *

- * For non-transactional messages, it does not matter as long as it's unique per - * process. - *

- *

- * See here for further - * discussion. + * globally unique. Producer group conceptually aggregates all producer instances of + * exactly same role, which is particularly important when transactional messages are + * involved. For non-transactional messages, it does not matter as long as it's unique + * per process. See here + * for further discussion. */ private String group; private String namespace; + private String accessChannel = AccessChannel.LOCAL.name(); + /** * Pulling topic information interval from the named server. * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask * updateTopicRouteInfoFromNameServer. */ private int pollNameServerInterval = 1000 * 30; + /** * Heartbeat interval in microseconds with message broker. * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask * sendHeartbeatToAllBroker . */ private int heartbeatBrokerInterval = 1000 * 30; + /** * Offset persistent interval for consumer. * see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask @@ -85,6 +84,7 @@ public class RocketMQCommonProperties implements Serializable { private boolean useTLS = TlsSystemConfig.tlsEnable; private boolean enableMsgTrace = true; + private String customizedTraceTopic; public boolean getEnabled() { @@ -198,4 +198,5 @@ public class RocketMQCommonProperties implements Serializable { public void setCustomizedTraceTopic(String customizedTraceTopic) { this.customizedTraceTopic = customizedTraceTopic; } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 1a889f6d7..ebe2c1c43 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -37,8 +37,6 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { /** * Message model defines the way how messages are delivered to each consumer clients. - *

- * * This field defaults to clustering. */ private String messageModel = MessageModel.CLUSTERING.getModeCN(); @@ -60,13 +58,12 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { private String subscription; /** - * Delay some time when exception occur + * Delay some time when exception occur . */ private long pullTimeDelayMillsWhenException = 1000; /** * Consuming point on consumer booting. - *

* * There are three consuming points: *
    @@ -90,6 +87,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { *
*/ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + /** * Backtracking consumption time with second precision. Time format is * 20131223171201
@@ -102,13 +100,14 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { /** * Flow control threshold on queue level, each message queue will cache at most 1000 * messages by default, Consider the {@link #pullBatchSize}, the instantaneous value - * may exceed the limit + * may exceed the limit . */ private int pullThresholdForQueue = 1000; + /** * Limit the cached message size on queue level, each message queue will cache at most * 100 MiB messages by default, Consider the {@link #pullBatchSize}, the instantaneous - * value may exceed the limit + * value may exceed the limit . * *

* The size of a message only measured by message body, so it's not accurate @@ -126,6 +125,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { private int consumeMaxSpan = 2000; private Push push = new Push(); + private Pull pull = new Pull(); public String getMessageModel() { @@ -238,6 +238,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { } public static class Push implements Serializable { + private static final long serialVersionUID = -7398468554978817630L; /** @@ -245,6 +246,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { * false, using {@link MessageListenerConcurrently}. */ private boolean orderly = false; + /** * Suspending pulling time for cases requiring slow pulling like flow-control * scenario. see{@link ConsumeMessageOrderlyService#processConsumeResult}. @@ -254,10 +256,9 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { /** * https://github.com/alibaba/spring-cloud-alibaba/issues/1866 Max re-consume - * times. -1 means 16 times. - *

- * If messages are re-consumed more than {@link #maxReconsumeTimes} before - * success, it's be directed to a deletion queue waiting. + * times. -1 means 16 times. If messages are re-consumed more than + * {@link #maxReconsumeTimes} before success, it's be directed to a deletion queue + * waiting. */ private int maxReconsumeTimes; @@ -285,21 +286,21 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { * MiB(Unlimited) *

* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated - * based on {@code pullThresholdSizeForTopic} if it is't unlimited + * based on {@code pullThresholdSizeForTopic} if it is't unlimited . *

* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 * message queues are assigned to this consumer, then pullThresholdSizeForQueue - * will be set to 100 MiB + * will be set to 100 MiB . */ private int pullThresholdSizeForTopic = -1; /** - * Message pull Interval + * Message pull Interval. */ private long pullInterval = 0; /** - * Batch consumption size + * Batch consumption size. */ private int consumeMessageBatchMaxSize = 1; @@ -366,15 +367,18 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } + } public static class Pull implements Serializable { + /** - * The poll timeout in milliseconds + * The poll timeout in milliseconds. */ private long pollTimeoutMillis = 1000 * 5; + /** - * Pull thread number + * Pull thread number. */ private int pullThreadNums = 20; @@ -385,13 +389,13 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { /** * Long polling mode, the Consumer connection timeout(must greater than - * brokerSuspendMaxTimeMillis), it is not recommended to modify + * brokerSuspendMaxTimeMillis), it is not recommended to modify. */ private long consumerTimeoutMillisWhenSuspend = 1000 * 30; /** * Ack state handling, including receive, reject, and retry, when a consumption - * exception occurs. see {@link } + * exception occurs. */ private String errAcknowledge; @@ -446,6 +450,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties { public void setPullThresholdForAll(long pullThresholdForAll) { this.pullThresholdForAll = pullThresholdForAll; } + } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java index 5fc34ed08..6c4af119d 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index 8f73cf93f..4035f1483 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -37,7 +37,6 @@ public class RocketMQProducerProperties extends RocketMQCommonProperties { /** * Maximum number of retry to perform internally before claiming sending failure in * synchronous mode. - *

* * This may potentially cause message duplication which is up to application * developers to resolve. @@ -47,7 +46,6 @@ public class RocketMQProducerProperties extends RocketMQCommonProperties { /** * Maximum number of retry to perform internally before claiming sending failure in * asynchronous mode. - *

* * This may potentially cause message duplication which is up to application * developers to resolve. @@ -203,19 +201,41 @@ public class RocketMQProducerProperties extends RocketMQCommonProperties { } public enum ProducerType { - Normal, Trans; + + /** + * Is not a transaction. + */ + Normal, + /** + * a transaction. + */ + Trans; public boolean equalsName(String name) { return this.name().equalsIgnoreCase(name); } + } public enum SendType { - OneWay, Async, Sync,; + + /** + * one way. + */ + OneWay, + /** + * Asynchronization Model. + */ + Async, + /** + * synchronization. + */ + Sync,; public boolean equalsName(String name) { return this.name().equalsIgnoreCase(name); } + } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java index 99a1a36fb..990a211ce 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQSpecificPropertiesProvider.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java index d4b62e5fa..0509805ea 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -36,10 +36,12 @@ import org.springframework.util.MimeTypeUtils; import org.springframework.util.StringUtils; /** - * * @author zkzlx */ -public class RocketMQMessageConverterSupport { +public final class RocketMQMessageConverterSupport { + + private RocketMQMessageConverterSupport() { + } private static final CompositeMessageConverter MESSAGE_CONVERTER = RocketMQBeanContainerCache .getBean(RocketMQMessageConverter.DEFAULT_NAME, diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java index 85165955d..1aed546a0 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java @@ -1,11 +1,11 @@ /* - * Copyright (C) 2018 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,11 +28,12 @@ import org.apache.rocketmq.remoting.RPCHook; import org.springframework.util.StringUtils; /** - * TODO Describe what it does - * * @author Jim */ -public class RocketMQUtils { +public final class RocketMQUtils { + + private RocketMQUtils() { + } public static T mergeRocketMQProperties( RocketMQBinderConfigurationProperties binderConfigurationProperties, diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index 02d0de9f9..f850afb4e 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -16,8 +16,6 @@ package com.alibaba.cloud.stream.binder.rocketmq; -import java.util.Arrays; - import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; @@ -61,13 +59,12 @@ public class RocketMQAutoConfigurationTests { .isEqualTo("127.0.0.1:9876,127.0.0.1:9877"); RocketMQExtendedBindingProperties bindingProperties = context .getBean(RocketMQExtendedBindingProperties.class); - assertThat( - bindingProperties.getExtendedConsumerProperties("input2").getSubscription()) - .isEqualTo("tag1"); - assertThat(bindingProperties.getExtendedConsumerProperties("input2").getPush().getOrderly() - ).isFalse(); - assertThat(bindingProperties.getExtendedConsumerProperties("input1") - .getPush().getOrderly()).isTrue(); + assertThat(bindingProperties.getExtendedConsumerProperties("input2") + .getSubscription()).isEqualTo("tag1"); + assertThat(bindingProperties.getExtendedConsumerProperties("input2").getPush() + .getOrderly()).isFalse(); + assertThat(bindingProperties.getExtendedConsumerProperties("input1").getPush() + .getOrderly()).isTrue(); }); }