|
|
@ -4,6 +4,7 @@ import java.util.ArrayList;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.HashSet;
|
|
|
|
import java.util.HashSet;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
import java.util.Optional;
|
|
|
|
import java.util.Set;
|
|
|
|
import java.util.Set;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
|
@ -48,20 +49,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
|
|
|
|
|
|
|
|
private ConsumerInstrumentation consumerInstrumentation;
|
|
|
|
private ConsumerInstrumentation consumerInstrumentation;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private InstrumentationManager instrumentationManager;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private RetryTemplate retryTemplate;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private RecoveryCallback<? extends Object> recoveryCallback;
|
|
|
|
|
|
|
|
|
|
|
|
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
|
|
|
|
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
|
|
|
|
|
|
|
|
|
|
|
|
private final String destination;
|
|
|
|
private final String destination;
|
|
|
|
|
|
|
|
|
|
|
|
private final String group;
|
|
|
|
private final String group;
|
|
|
|
|
|
|
|
|
|
|
|
private final InstrumentationManager instrumentationManager;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final ConsumersManager consumersManager;
|
|
|
|
private final ConsumersManager consumersManager;
|
|
|
|
|
|
|
|
|
|
|
|
private RetryTemplate retryTemplate;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private RecoveryCallback<? extends Object> recoveryCallback;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
|
|
|
|
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
|
|
|
|
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
|
|
|
|
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
|
|
|
|
String destination, String group,
|
|
|
|
String destination, String group,
|
|
|
@ -75,21 +76,20 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
protected void doStart() {
|
|
|
|
protected void doStart() {
|
|
|
|
if (!consumerProperties.getExtension().getEnabled()) {
|
|
|
|
if (consumerProperties == null
|
|
|
|
|
|
|
|
|| !consumerProperties.getExtension().getEnabled()) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
String tags = consumerProperties == null ? null
|
|
|
|
String tags = consumerProperties.getExtension().getTags();
|
|
|
|
: consumerProperties.getExtension().getTags();
|
|
|
|
Boolean isOrderly = consumerProperties.getExtension().getOrderly();
|
|
|
|
Boolean isOrderly = consumerProperties == null ? false
|
|
|
|
|
|
|
|
: consumerProperties.getExtension().getOrderly();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
|
|
|
|
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group,
|
|
|
|
destination, consumerProperties);
|
|
|
|
destination, consumerProperties);
|
|
|
|
|
|
|
|
|
|
|
|
final CloudStreamMessageListener listener = isOrderly
|
|
|
|
final CloudStreamMessageListener listener = isOrderly
|
|
|
|
? new CloudStreamMessageListenerOrderly(instrumentationManager)
|
|
|
|
? new CloudStreamMessageListenerOrderly()
|
|
|
|
: new CloudStreamMessageListenerConcurrently(instrumentationManager);
|
|
|
|
: new CloudStreamMessageListenerConcurrently();
|
|
|
|
|
|
|
|
|
|
|
|
if (retryTemplate != null) {
|
|
|
|
if (retryTemplate != null) {
|
|
|
|
retryTemplate.registerListener(listener);
|
|
|
|
retryTemplate.registerListener(listener);
|
|
|
@ -99,9 +99,10 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
|
|
|
|
: Arrays.stream(tags.split("\\|\\|")).map(String::trim)
|
|
|
|
.collect(Collectors.toSet());
|
|
|
|
.collect(Collectors.toSet());
|
|
|
|
|
|
|
|
|
|
|
|
consumerInstrumentation = instrumentationManager
|
|
|
|
Optional.ofNullable(instrumentationManager).ifPresent(manager -> {
|
|
|
|
.getConsumerInstrumentation(destination);
|
|
|
|
consumerInstrumentation = manager.getConsumerInstrumentation(destination);
|
|
|
|
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
|
|
|
|
manager.addHealthInstrumentation(consumerInstrumentation);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
|
|
|
|
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
|
|
|
@ -111,10 +112,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
else {
|
|
|
|
else {
|
|
|
|
consumer.subscribe(destination, String.join(" || ", tagsSet));
|
|
|
|
consumer.subscribe(destination, String.join(" || ", tagsSet));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
consumerInstrumentation.markStartedSuccessfully();
|
|
|
|
Optional.ofNullable(consumerInstrumentation)
|
|
|
|
|
|
|
|
.ifPresent(c -> c.markStartedSuccessfully());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (MQClientException e) {
|
|
|
|
catch (MQClientException e) {
|
|
|
|
consumerInstrumentation.markStartFailed(e);
|
|
|
|
Optional.ofNullable(consumerInstrumentation)
|
|
|
|
|
|
|
|
.ifPresent(c -> c.markStartFailed(e));
|
|
|
|
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
|
|
|
|
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by "
|
|
|
|
+ e.getErrorMessage(), e);
|
|
|
|
+ e.getErrorMessage(), e);
|
|
|
|
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
|
|
|
|
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
|
|
|
@ -148,12 +151,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
|
|
|
|
|
|
|
|
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
|
|
|
|
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
|
|
|
|
|
|
|
|
|
|
|
|
private final InstrumentationManager instrumentationManager;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
|
|
|
|
|
|
|
|
this.instrumentationManager = instrumentationManager;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
|
|
|
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
|
|
|
|
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
|
|
|
|
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
@ -180,10 +177,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
else {
|
|
|
|
Acknowledgement result = doSendMsgs(msgs, null);
|
|
|
|
Acknowledgement result = doSendMsgs(msgs, null);
|
|
|
|
instrumentationManager
|
|
|
|
Optional.ofNullable(
|
|
|
|
.getConsumerInstrumentation(
|
|
|
|
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
.ifPresent(manager -> {
|
|
|
|
.markConsumed();
|
|
|
|
manager.getConsumerInstrumentation(
|
|
|
|
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
|
|
|
|
.markConsumed();
|
|
|
|
|
|
|
|
});
|
|
|
|
return result;
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -191,10 +191,13 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
logger.error(
|
|
|
|
logger.error(
|
|
|
|
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
|
|
|
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
|
|
|
e);
|
|
|
|
e);
|
|
|
|
instrumentationManager
|
|
|
|
Optional.ofNullable(
|
|
|
|
.getConsumerInstrumentation(
|
|
|
|
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
.ifPresent(manager -> {
|
|
|
|
.markConsumedFailure();
|
|
|
|
manager.getConsumerInstrumentation(
|
|
|
|
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
|
|
|
|
.markConsumedFailure();
|
|
|
|
|
|
|
|
});
|
|
|
|
throw new RuntimeException(
|
|
|
|
throw new RuntimeException(
|
|
|
|
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
|
|
|
"RocketMQ Message hasn't been processed successfully. Caused by ",
|
|
|
|
e);
|
|
|
|
e);
|
|
|
@ -232,16 +235,22 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
public <T, E extends Throwable> void close(RetryContext context,
|
|
|
|
public <T, E extends Throwable> void close(RetryContext context,
|
|
|
|
RetryCallback<T, E> callback, Throwable throwable) {
|
|
|
|
RetryCallback<T, E> callback, Throwable throwable) {
|
|
|
|
if (throwable != null) {
|
|
|
|
if (throwable != null) {
|
|
|
|
instrumentationManager
|
|
|
|
Optional.ofNullable(
|
|
|
|
.getConsumerInstrumentation(
|
|
|
|
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
.ifPresent(manager -> {
|
|
|
|
.markConsumedFailure();
|
|
|
|
manager.getConsumerInstrumentation(
|
|
|
|
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
|
|
|
|
.markConsumedFailure();
|
|
|
|
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
else {
|
|
|
|
instrumentationManager
|
|
|
|
Optional.ofNullable(
|
|
|
|
.getConsumerInstrumentation(
|
|
|
|
RocketMQInboundChannelAdapter.this.instrumentationManager)
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
.ifPresent(manager -> {
|
|
|
|
.markConsumed();
|
|
|
|
manager.getConsumerInstrumentation(
|
|
|
|
|
|
|
|
RocketMQInboundChannelAdapter.this.destination)
|
|
|
|
|
|
|
|
.markConsumed();
|
|
|
|
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -254,11 +263,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
protected class CloudStreamMessageListenerConcurrently
|
|
|
|
protected class CloudStreamMessageListenerConcurrently
|
|
|
|
extends CloudStreamMessageListener implements MessageListenerConcurrently {
|
|
|
|
extends CloudStreamMessageListener implements MessageListenerConcurrently {
|
|
|
|
|
|
|
|
|
|
|
|
public CloudStreamMessageListenerConcurrently(
|
|
|
|
|
|
|
|
InstrumentationManager instrumentationManager) {
|
|
|
|
|
|
|
|
super(instrumentationManager);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
|
|
|
|
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
|
|
|
|
ConsumeConcurrentlyContext context) {
|
|
|
|
ConsumeConcurrentlyContext context) {
|
|
|
@ -272,11 +276,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
|
|
|
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
|
|
|
|
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener
|
|
|
|
implements MessageListenerOrderly {
|
|
|
|
implements MessageListenerOrderly {
|
|
|
|
|
|
|
|
|
|
|
|
public CloudStreamMessageListenerOrderly(
|
|
|
|
|
|
|
|
InstrumentationManager instrumentationManager) {
|
|
|
|
|
|
|
|
super(instrumentationManager);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
|
|
|
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
|
|
|
ConsumeOrderlyContext context) {
|
|
|
|
ConsumeOrderlyContext context) {
|
|
|
|