|
|
|
@ -3,6 +3,7 @@ package com.alibaba.arthas.channel.server.message.impl;
|
|
|
|
|
import com.alibaba.arthas.channel.server.conf.ScheduledExecutorConfig;
|
|
|
|
|
import com.alibaba.arthas.channel.server.message.MessageExchangeException;
|
|
|
|
|
import com.alibaba.arthas.channel.server.message.MessageExchangeService;
|
|
|
|
|
import com.alibaba.arthas.channel.server.message.TopicNotFoundException;
|
|
|
|
|
import com.alibaba.arthas.channel.server.message.topic.ActionRequestTopic;
|
|
|
|
|
import com.alibaba.arthas.channel.server.message.topic.Topic;
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
@ -117,7 +118,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
|
|
|
|
|
private TopicData getAndCheckTopicExists(Topic topic) throws MessageExchangeException {
|
|
|
|
|
TopicData topicData = topicMap.get(topic);
|
|
|
|
|
if (topicData == null) {
|
|
|
|
|
throw new MessageExchangeException("topic is not exists: " + topic);
|
|
|
|
|
throw new TopicNotFoundException("topic is not exists: " + topic);
|
|
|
|
|
}
|
|
|
|
|
return topicData;
|
|
|
|
|
}
|
|
|
|
@ -128,7 +129,11 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
|
|
|
|
|
TopicData topicData = getAndCheckTopicExists(topic);
|
|
|
|
|
return Mono.justOrEmpty(topicData.messageQueue.poll(timeout, TimeUnit.MILLISECONDS));
|
|
|
|
|
} catch (Throwable e) {
|
|
|
|
|
return Mono.error(new MessageExchangeException("poll message failure: "+e.getMessage(), e));
|
|
|
|
|
if (e instanceof TopicNotFoundException) {
|
|
|
|
|
return Mono.error(e);
|
|
|
|
|
} else {
|
|
|
|
|
return Mono.error(new MessageExchangeException("poll message failure: " + e.getMessage(), e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|