Code style

pull/2013/head
zkzlx 4 years ago
parent 6d7c47a366
commit 9379d18ace

@ -16,11 +16,11 @@
package com.alibaba.cloud.examples;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
/**
@ -30,22 +30,18 @@ import org.springframework.stereotype.Component;
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getProperty("test");
if ("1".equals(num)) {
System.out.println(
"executer: " + new String(msg.getBody()) + " unknown");
System.out.println("executer: " + new String(msg.getBody()) + " unknown");
return LocalTransactionState.UNKNOW;
}
else if ("2".equals(num)) {
System.out.println(
"executer: " + new String(msg.getBody()) + " rollback");
System.out.println("executer: " + new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println(
"executer: " + new String(msg.getBody()) + " commit");
System.out.println("executer: " + new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -60,6 +60,7 @@ public class RocketMQMessageChannelBinder extends
ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties binderConfigurationProperties;
public RocketMQMessageChannelBinder(
@ -175,7 +176,6 @@ public class RocketMQMessageChannelBinder extends
/**
* Binders can return an {@link ErrorMessageStrategy} for building error messages;
* binder implementations typically might add extra headers to the error message.
*
* @return the implementation - may be null.
*/
@Override
@ -203,4 +203,5 @@ public class RocketMQMessageChannelBinder extends
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
}

@ -34,7 +34,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
/**
* issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681
* issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681 .
*
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ -45,6 +46,7 @@ public class RocketMQBinderAutoConfiguration {
@Autowired
private RocketMQExtendedBindingProperties extendedBindingProperties;
@Autowired
private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -31,9 +31,11 @@ public class RocketMQConst extends MessageConst {
/**
* Default group for SCS RocketMQ Binder.
*/
public static final String DEFAULT_GROUP = "binder_default_group_name";
public static final String DEFAULT_GROUP = "binder_default_group_name";
/**
* user args for SCS RocketMQ Binder.
*/
public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
/**
@ -41,19 +43,34 @@ public class RocketMQConst extends MessageConst {
* and parameters are passed through HEADERS.
*/
public static class Headers {
/**
* keys for SCS RocketMQ Headers.
*/
public static final String KEYS = MessageConst.PROPERTY_KEYS;
/**
* tags for SCS RocketMQ Headers.
*/
public static final String TAGS = MessageConst.PROPERTY_TAGS;
/**
* topic for SCS RocketMQ Headers.
*/
public static final String TOPIC = "MQ_TOPIC";
/**
* The ID of the message.
*/
public static final String MESSAGE_ID = "MQ_MESSAGE_ID";
/**
* The timestamp that the message producer invokes the message sending API.
*/
public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP";
/**
* The IP and port number of the message producer
* The IP and port number of the message producer.
*/
public static final String BORN_HOST = "MQ_BORN_HOST";
@ -61,19 +78,23 @@ public class RocketMQConst extends MessageConst {
* Message flag, MQ is not processed and is available for use by applications.
*/
public static final String FLAG = "MQ_FLAG";
/**
* Message consumption queue ID
* Message consumption queue ID.
*/
public static final String QUEUE_ID = "MQ_QUEUE_ID";
/**
* Message system Flag, such as whether or not to compress, whether or not to
* transactional messages.
*/
public static final String SYS_FLAG = "MQ_SYS_FLAG";
/**
* The transaction ID of the transaction message.
*/
public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID";
}
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -27,14 +27,19 @@ import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.util.ClassUtils;
/**
* The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME}
* The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME} .
*
* @author zkzlx
*/
public class RocketMQMessageConverter {
/**
* rocketMQMessageConverter.
*/
public static final String DEFAULT_NAME = "rocketMQMessageConverter";
private static final boolean JACKSON_PRESENT;
private static final boolean FASTJSON_PRESENT;
static {
@ -81,4 +86,5 @@ public class RocketMQMessageConverter {
public void setMessageConverter(CompositeMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -32,12 +32,15 @@ import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.util.StringUtils;
/**
* Gets the beans configured in the configuration file
* Gets the beans configured in the configuration file.
*
* @author junboXiang
*/
public final class RocketMQBeanContainerCache {
private RocketMQBeanContainerCache() {
}
private static final Class<?>[] CLASSES = new Class[] {
CompositeMessageConverter.class, AllocateMessageQueueStrategy.class,
MessageQueueSelector.class, MessageListener.class, TransactionListener.class,

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -22,7 +22,7 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
/**
* find RocketMQ bean by annotations
* find RocketMQ bean by annotations.
*
* @author junboXiang
*

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -27,7 +27,7 @@ public interface ErrorAcknowledgeHandler {
/**
* Ack state handling, including receive, reject, and retry, when a consumption
* exception occurs.
* @param message
* @param message message
* @return see {@link Status}
*/
Status handler(Message<?> message);

@ -41,6 +41,9 @@ import org.springframework.util.StringUtils;
*/
public final class RocketMQConsumerFactory {
private RocketMQConsumerFactory() {
}
private final static Logger log = LoggerFactory
.getLogger(RocketMQConsumerFactory.class);
@ -91,7 +94,8 @@ public final class RocketMQConsumerFactory {
/**
* todo Compatible with versions less than 4.6 ?
* @return
* @param extendedConsumerProperties extendedConsumerProperties
* @return DefaultLitePullConsumer
*/
public static DefaultLitePullConsumer initPullConsumer(
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {

@ -48,7 +48,6 @@ import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* TODO Describe what it does
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport
@ -58,10 +57,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
.getLogger(RocketMQInboundChannelAdapter.class);
private RetryTemplate retryTemplate;
private RecoveryCallback<Object> recoveryCallback;
private DefaultMQPushConsumer pushConsumer;
private final String topic;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
public RocketMQInboundChannelAdapter(String topic,
@ -146,11 +148,11 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
* The actual execution of a user-defined input consumption service method.
* @param messageExtList rocket mq message list
* @param failSupplier {@link ConsumeConcurrentlyStatus} or
* {@link ConsumeOrderlyStatus}
* {@link ConsumeOrderlyStatus}
* @param sucSupplier {@link ConsumeConcurrentlyStatus} or
* {@link ConsumeOrderlyStatus}
* @param <R>
* @return
* {@link ConsumeOrderlyStatus}
* @param <R> object
* @return R
*/
private <R> R consumeMessage(List<MessageExt> messageExtList,
Supplier<R> failSupplier, Supplier<R> sucSupplier) {

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -29,15 +29,16 @@ import org.springframework.messaging.Message;
* @author zkzlx
*/
public class DefaultErrorAcknowledgeHandler implements ErrorAcknowledgeHandler {
/**
* Ack state handling, including receive, reject, and retry, when a consumption
* exception occurs.
*
* @param message
* @param message message
* @return see {@link Status}
*/
@Override
public Status handler(Message<?> message) {
return Status.REQUEUE;
}
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -31,16 +31,23 @@ import org.springframework.util.Assert;
/**
* A pollable {@link org.springframework.integration.core.MessageSource} for RocketMQ.
*
* @author zkzlx
*/
public class RocketMQAckCallback implements AcknowledgmentCallback {
private final static Logger log = LoggerFactory.getLogger(RocketMQAckCallback.class);
private boolean acknowledged;
private boolean autoAckEnabled = true;
private MessageExt messageExt;
private AssignedMessageQueue assignedMessageQueue;
private DefaultLitePullConsumer consumer;
private final MessageQueue messageQueue;
public RocketMQAckCallback(DefaultLitePullConsumer consumer,

@ -56,14 +56,18 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
.getLogger(RocketMQMessageSource.class);
private DefaultLitePullConsumer consumer;
private AssignedMessageQueue assignedMessageQueue;
private volatile boolean running;
private final String topic;
private final MessageSelector messageSelector;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
private volatile Iterator<MessageExt> messageExtIterator=null;
private volatile Iterator<MessageExt> messageExtIterator = null;
public RocketMQMessageSource(String name,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
@ -85,7 +89,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.consumer = RocketMQConsumerFactory
.initPullConsumer(extendedConsumerProperties);
// This parameter must be 1, otherwise doReceive cannot be handled singly.
// this.consumer.setPullBatchSize(1);
// this.consumer.setPullBatchSize(1);
this.consumer.subscribe(topic, messageSelector);
this.consumer.setAutoCommit(false);
this.assignedMessageQueue = acquireAssignedMessageQueue(this.consumer);
@ -135,18 +139,18 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
@Override
protected synchronized Object doReceive() {
if(messageExtIterator == null){
if (messageExtIterator == null) {
List<MessageExt> messageExtList = consumer.poll();
if (CollectionUtils.isEmpty(messageExtList) || messageExtList.size() > 1) {
return null;
}
messageExtIterator = messageExtList.iterator();
}
MessageExt messageExt=messageExtIterator.next();
if(!messageExtIterator.hasNext()){
MessageExt messageExt = messageExtIterator.next();
if (!messageExtIterator.hasNext()) {
messageExtIterator = null;
}
if(null == messageExt){
if (null == messageExt) {
return null;
}
MessageQueue messageQueue = null;
@ -156,8 +160,9 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
break;
}
}
if(messageQueue == null){
throw new IllegalArgumentException("The message queue is not in assigned list");
if (messageQueue == null) {
throw new IllegalArgumentException(
"The message queue is not in assigned list");
}
Message message = RocketMQMessageConverterSupport
.convertMessage2Spring(messageExt);

@ -39,18 +39,23 @@ import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Extended function related to producer . eg:initial
* Extended function related to producer . eg:initial .
*
* @author zkzlx
*/
public final class RocketMQProduceFactory {
private RocketMQProduceFactory() {
}
private final static Logger log = LoggerFactory
.getLogger(RocketMQProduceFactory.class);
/**
* init for the producer,including convert producer params.
* @return
* @param topic topic
* @param producerProperties producerProperties
* @return DefaultMQProducer
*/
public static DefaultMQProducer initRocketMQProducer(String topic,
RocketMQProducerProperties producerProperties) {

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -61,16 +61,23 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
.getLogger(RocketMQProducerMessageHandler.class);
private volatile boolean running = false;
private volatile boolean isTrans = false;
private ErrorMessageStrategy errorMessageStrategy;
private MessageChannel sendFailureChannel;
private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
private DefaultMQProducer defaultMQProducer;
private MessageQueueSelector messageQueueSelector;
private final ProducerDestination destination;
private final ExtendedProducerProperties<RocketMQProducerProperties> extendedProducerProperties;
private final RocketMQProducerProperties mqProducerProperties;
public RocketMQProducerMessageHandler(ProducerDestination destination,
@ -93,10 +100,8 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
// Use the default if the partition is on and no customization is available.
this.messageQueueSelector = RocketMQBeanContainerCache.getBean(
mqProducerProperties.getMessageQueueSelector(),
MessageQueueSelector.class,
extendedProducerProperties.isPartitioned()
? new PartitionMessageQueueSelector()
: null);
MessageQueueSelector.class, extendedProducerProperties.isPartitioned()
? new PartitionMessageQueueSelector() : null);
}
@Override
@ -226,9 +231,9 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
}
/**
* https://github.com/alibaba/spring-cloud-alibaba/issues/1408
* @param message
* @return
* https://github.com/alibaba/spring-cloud-alibaba/issues/1408 .
* @param message message
* @return SendCallback
*/
private SendCallback getSendCallback(Message<?> message) {
SendCallback sendCallback = RocketMQBeanContainerCache
@ -283,4 +288,5 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
this.partitioningInterceptor = partitioningInterceptor;
return this;
}
}

@ -28,6 +28,7 @@ import org.springframework.context.Lifecycle;
public class Instrumentation {
private final String name;
private Lifecycle actuator;
protected final AtomicBoolean started = new AtomicBoolean(false);
@ -88,4 +89,17 @@ public class Instrumentation {
public int hashCode() {
return Objects.hash(getName(), getActuator());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Instrumentation that = (Instrumentation) o;
return name.equals(that.name) && actuator.equals(that.actuator);
}
}

@ -26,6 +26,9 @@ import java.util.Map;
*/
public final class InstrumentationManager {
private InstrumentationManager() {
}
private static final Map<Integer, Instrumentation> HEALTH_INSTRUMENTATIONS = new HashMap<>();
public static Collection<Instrumentation> getHealthInstrumentations() {

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -20,6 +20,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* binding rocketMq properties.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -26,6 +26,7 @@ import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
* @author zkzlx
*/
public class RocketMQCommonProperties implements Serializable {
private static final long serialVersionUID = -6724870154343284715L;
private boolean enabled = true;
@ -41,38 +42,36 @@ public class RocketMQCommonProperties implements Serializable {
* The property of "secret-key".
*/
private String secretKey;
/**
* Consumers of the same role is required to have exactly same subscriptions and
* consumerGroup to correctly achieve load balance. It's required and needs to be
* globally unique.
* </p>
* Producer group conceptually aggregates all producer instances of exactly same role,
* which is particularly important when transactional messages are involved.
* </p>
* <p>
* For non-transactional messages, it does not matter as long as it's unique per
* process.
* </p>
* <p>
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further
* discussion.
* globally unique. Producer group conceptually aggregates all producer instances of
* exactly same role, which is particularly important when transactional messages are
* involved. For non-transactional messages, it does not matter as long as it's unique
* per process. See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a>
* for further discussion.
*/
private String group;
private String namespace;
private String accessChannel = AccessChannel.LOCAL.name();
/**
* Pulling topic information interval from the named server.
* see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
* updateTopicRouteInfoFromNameServer.
*/
private int pollNameServerInterval = 1000 * 30;
/**
* Heartbeat interval in microseconds with message broker.
* see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
* sendHeartbeatToAllBroker .
*/
private int heartbeatBrokerInterval = 1000 * 30;
/**
* Offset persistent interval for consumer.
* see{@link MQClientInstance#startScheduledTask()},eg:ScheduledTask
@ -85,6 +84,7 @@ public class RocketMQCommonProperties implements Serializable {
private boolean useTLS = TlsSystemConfig.tlsEnable;
private boolean enableMsgTrace = true;
private String customizedTraceTopic;
public boolean getEnabled() {
@ -198,4 +198,5 @@ public class RocketMQCommonProperties implements Serializable {
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -37,8 +37,6 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
/**
* Message model defines the way how messages are delivered to each consumer clients.
* </p>
*
* This field defaults to clustering.
*/
private String messageModel = MessageModel.CLUSTERING.getModeCN();
@ -60,13 +58,12 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
private String subscription;
/**
* Delay some time when exception occur
* Delay some time when exception occur .
*/
private long pullTimeDelayMillsWhenException = 1000;
/**
* Consuming point on consumer booting.
* </p>
*
* There are three consuming points:
* <ul>
@ -90,6 +87,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
* </ul>
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* Backtracking consumption time with second precision. Time format is
* 20131223171201<br>
@ -102,13 +100,14 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
/**
* Flow control threshold on queue level, each message queue will cache at most 1000
* messages by default, Consider the {@link #pullBatchSize}, the instantaneous value
* may exceed the limit
* may exceed the limit .
*/
private int pullThresholdForQueue = 1000;
/**
* Limit the cached message size on queue level, each message queue will cache at most
* 100 MiB messages by default, Consider the {@link #pullBatchSize}, the instantaneous
* value may exceed the limit
* value may exceed the limit .
*
* <p>
* The size of a message only measured by message body, so it's not accurate
@ -126,6 +125,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
private int consumeMaxSpan = 2000;
private Push push = new Push();
private Pull pull = new Pull();
public String getMessageModel() {
@ -238,6 +238,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
}
public static class Push implements Serializable {
private static final long serialVersionUID = -7398468554978817630L;
/**
@ -245,6 +246,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
* false, using {@link MessageListenerConcurrently}.
*/
private boolean orderly = false;
/**
* Suspending pulling time for cases requiring slow pulling like flow-control
* scenario. see{@link ConsumeMessageOrderlyService#processConsumeResult}.
@ -254,10 +256,9 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
/**
* https://github.com/alibaba/spring-cloud-alibaba/issues/1866 Max re-consume
* times. -1 means 16 times.
* </p>
* If messages are re-consumed more than {@link #maxReconsumeTimes} before
* success, it's be directed to a deletion queue waiting.
* times. -1 means 16 times. If messages are re-consumed more than
* {@link #maxReconsumeTimes} before success, it's be directed to a deletion queue
* waiting.
*/
private int maxReconsumeTimes;
@ -285,21 +286,21 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
* MiB(Unlimited)
* <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated
* based on {@code pullThresholdSizeForTopic} if it is't unlimited
* based on {@code pullThresholdSizeForTopic} if it is't unlimited .
* <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10
* message queues are assigned to this consumer, then pullThresholdSizeForQueue
* will be set to 100 MiB
* will be set to 100 MiB .
*/
private int pullThresholdSizeForTopic = -1;
/**
* Message pull Interval
* Message pull Interval.
*/
private long pullInterval = 0;
/**
* Batch consumption size
* Batch consumption size.
*/
private int consumeMessageBatchMaxSize = 1;
@ -366,15 +367,18 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
}
public static class Pull implements Serializable {
/**
* The poll timeout in milliseconds
* The poll timeout in milliseconds.
*/
private long pollTimeoutMillis = 1000 * 5;
/**
* Pull thread number
* Pull thread number.
*/
private int pullThreadNums = 20;
@ -385,13 +389,13 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
/**
* Long polling mode, the Consumer connection timeout(must greater than
* brokerSuspendMaxTimeMillis), it is not recommended to modify
* brokerSuspendMaxTimeMillis), it is not recommended to modify.
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
* Ack state handling, including receive, reject, and retry, when a consumption
* exception occurs. see {@link }
* exception occurs.
*/
private String errAcknowledge;
@ -446,6 +450,7 @@ public class RocketMQConsumerProperties extends RocketMQCommonProperties {
public void setPullThresholdForAll(long pullThresholdForAll) {
this.pullThresholdForAll = pullThresholdForAll;
}
}
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -37,7 +37,6 @@ public class RocketMQProducerProperties extends RocketMQCommonProperties {
/**
* Maximum number of retry to perform internally before claiming sending failure in
* synchronous mode.
* </p>
*
* This may potentially cause message duplication which is up to application
* developers to resolve.
@ -47,7 +46,6 @@ public class RocketMQProducerProperties extends RocketMQCommonProperties {
/**
* Maximum number of retry to perform internally before claiming sending failure in
* asynchronous mode.
* </p>
*
* This may potentially cause message duplication which is up to application
* developers to resolve.
@ -203,19 +201,41 @@ public class RocketMQProducerProperties extends RocketMQCommonProperties {
}
public enum ProducerType {
Normal, Trans;
/**
* Is not a transaction.
*/
Normal,
/**
* a transaction.
*/
Trans;
public boolean equalsName(String name) {
return this.name().equalsIgnoreCase(name);
}
}
public enum SendType {
OneWay, Async, Sync,;
/**
* one way.
*/
OneWay,
/**
* Asynchronization Model.
*/
Async,
/**
* synchronization.
*/
Sync,;
public boolean equalsName(String name) {
return this.name().equalsIgnoreCase(name);
}
}
}

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -36,10 +36,12 @@ import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
/**
*
* @author zkzlx
*/
public class RocketMQMessageConverterSupport {
public final class RocketMQMessageConverterSupport {
private RocketMQMessageConverterSupport() {
}
private static final CompositeMessageConverter MESSAGE_CONVERTER = RocketMQBeanContainerCache
.getBean(RocketMQMessageConverter.DEFAULT_NAME,

@ -1,11 +1,11 @@
/*
* Copyright (C) 2018 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -28,11 +28,12 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.springframework.util.StringUtils;
/**
* TODO Describe what it does
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQUtils {
public final class RocketMQUtils {
private RocketMQUtils() {
}
public static <T extends RocketMQCommonProperties> T mergeRocketMQProperties(
RocketMQBinderConfigurationProperties binderConfigurationProperties,

@ -16,8 +16,6 @@
package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.Arrays;
import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
@ -61,13 +59,12 @@ public class RocketMQAutoConfigurationTests {
.isEqualTo("127.0.0.1:9876,127.0.0.1:9877");
RocketMQExtendedBindingProperties bindingProperties = context
.getBean(RocketMQExtendedBindingProperties.class);
assertThat(
bindingProperties.getExtendedConsumerProperties("input2").getSubscription())
.isEqualTo("tag1");
assertThat(bindingProperties.getExtendedConsumerProperties("input2").getPush().getOrderly()
).isFalse();
assertThat(bindingProperties.getExtendedConsumerProperties("input1")
.getPush().getOrderly()).isTrue();
assertThat(bindingProperties.getExtendedConsumerProperties("input2")
.getSubscription()).isEqualTo("tag1");
assertThat(bindingProperties.getExtendedConsumerProperties("input2").getPush()
.getOrderly()).isFalse();
assertThat(bindingProperties.getExtendedConsumerProperties("input1").getPush()
.getOrderly()).isTrue();
});
}

Loading…
Cancel
Save