add rocketmq produce and consumer test

pull/2750/head
windwheel committed by Steve Rao
parent c5701b4753
commit 29cbbd1d73

@ -0,0 +1,18 @@
package com.alibaba.cloud.rocketmq;
public class SimpleMsg {
private String msg;
public SimpleMsg(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}

@ -1,14 +1,21 @@
package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.HashMap;
import java.util.Map;
import com.alibaba.cloud.rocketmq.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.testsupport.SpringCloudAlibaba;
import com.alibaba.cloud.testsupport.TestExtend;
import org.apache.rocketmq.common.message.MessageConst;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@ -18,6 +25,9 @@ import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
@ -26,6 +36,7 @@ import org.springframework.core.ParameterizedTypeReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
@ -33,11 +44,12 @@ import static com.alibaba.cloud.testsupport.Constant.TIME_OUT;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
@SpringCloudAlibaba(composeFiles = "docker/rocket-compose-test.yml", serviceName = "rocketmq-standalone")
@TestExtend(time = 10 * TIME_OUT)
@TestExtend(time = 11 * TIME_OUT)
@RunWith(SpringRunner.class)
@EnableBinding(RocketmqProduceAndConsumerTests.TestConfig.PolledProcessor.class)
@EnableBinding({Processor.class, RocketmqProduceAndConsumerTests.PolledProcessor.class})
@SpringBootTest(classes = RocketmqProduceAndConsumerTests.TestConfig.class, webEnvironment = NONE, properties = {
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876,127.0.0.1:9877",
"spring.cloud.stream.rocketmq.binder.group=test-group1",
"spring.cloud.stream.pollable-source=pollable",
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
"spring.cloud.stream.bindings.output.content-type=application/json",
@ -45,14 +57,10 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
"spring.cloud.stream.bindings.input1.content-type=application/json",
"spring.cloud.stream.bindings.input1.group=test-group1",
"spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true",
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
"spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input2.content-type=application/json",
"spring.cloud.stream.bindings.input2.group=test-group2",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tag1" })
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1"})
public class RocketmqProduceAndConsumerTests {
@Test
public void testConsumeAndProduce(){
@ -66,44 +74,44 @@ public class RocketmqProduceAndConsumerTests {
@Bean(name = "produce")
@Profile("test")
public ApplicationRunner produceRunner(PollableMessageSource source,
MessageChannel dest) {
public ApplicationRunner produceRunner(MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
System.out.println("Processed a message");
}
else {
System.out.println("Nothing to do");
}
String key = "KEY";
String messageId = "1";
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_TAGS, "TagA");
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, messageId);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ"), headers);
dest.send(msg);
Thread.sleep(5_000);
}
};
}
@Bean(name = "consumer")
@Profile("test")
@DependsOn(value = "produce")
public ApplicationRunner consumerRunner(MessageChannel dest) {
public ApplicationRunner consumerRunner(PollableMessageSource dest) {
return args -> {
((SubscribableChannel) dest).subscribe(message -> System.out.println(message));
((SubscribableChannel) dest).subscribe(message -> Assertions.assertEquals(message,"Hello RocketMQ" ));
};
}
public interface PolledProcessor {
}
/**
*
*/
public interface PolledProcessor extends Source, Sink {
@Input
PollableMessageSource source();
String CUSTOMIZE_OUTPUT = "output";
@Output
MessageChannel dest();
@Output(CUSTOMIZE_OUTPUT)
MessageChannel output();
}
String CUSTOMIZE_INPUT = "input1";
@Input(CUSTOMIZE_INPUT)
SubscribableChannel input();
}
}

Loading…
Cancel
Save