Merge remote-tracking branch 'origin/2021.x' into 2021.x
commit
8a4cdf26d9
@ -0,0 +1,57 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<parent>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-alibaba-examples</artifactId>
|
||||
<version>${revision}</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
|
||||
<artifactId>rocketmq-pollable-consume-example</artifactId>
|
||||
<name>Spring Cloud Starter Stream Alibaba RocketMQ PollableMessageSource Consume Example</name>
|
||||
<description>Example demonstrating how to use rocketmq to produce, and consume by PollableMessageSource</description>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-json</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>rocketmq-example-common</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>${maven-deploy-plugin.version}</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Copyright 2013-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.examples.orderly;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.alibaba.cloud.examples.common.SimpleMsg;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.binder.PollableMessageSource;
|
||||
import org.springframework.cloud.stream.function.StreamBridge;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
/**
|
||||
* @author sorie
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class RocketMQPollableConsumeApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(RocketMQPollableConsumeApplication.class);
|
||||
|
||||
@Autowired
|
||||
private StreamBridge streamBridge;
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(RocketMQPollableConsumeApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationRunner producer() {
|
||||
return args -> {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
String key = "KEY" + i;
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put(MessageConst.PROPERTY_KEYS, key);
|
||||
Message<SimpleMsg> msg = new GenericMessage(
|
||||
new SimpleMsg("Hello RocketMQ " + i), headers);
|
||||
streamBridge.send("producer-out-0", msg);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationRunner pollableRunner(PollableMessageSource destIn) {
|
||||
return args -> {
|
||||
while (true) {
|
||||
try {
|
||||
if (!destIn.poll((m) -> {
|
||||
SimpleMsg newPayload = (SimpleMsg) m.getPayload();
|
||||
System.out.println(newPayload.getMsg());
|
||||
}, new ParameterizedTypeReference<SimpleMsg>() { })) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// handle failure
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
server:
|
||||
port: 28089
|
||||
spring:
|
||||
application:
|
||||
name: rocketmq-pollable-consume-example
|
||||
cloud:
|
||||
stream:
|
||||
pollable-source: pollable
|
||||
rocketmq:
|
||||
binder:
|
||||
name-server: localhost:9876
|
||||
bindings:
|
||||
producer-out-0:
|
||||
producer:
|
||||
group: output_1
|
||||
bindings:
|
||||
producer-out-0:
|
||||
destination: pollable
|
||||
pollable-in-0:
|
||||
destination: pollable
|
||||
logging:
|
||||
level:
|
||||
org.springframework.context.support: debug
|
@ -0,0 +1,189 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.utils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.cloud.commons.util.InetUtils;
|
||||
import org.springframework.cloud.commons.util.InetUtilsProperties;
|
||||
|
||||
/**
|
||||
* @author HH
|
||||
*/
|
||||
public class InetIPv6Util implements Closeable {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private final Log log = LogFactory.getLog(InetIPv6Util.class);
|
||||
|
||||
private final InetUtilsProperties properties;
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.executorService.shutdown();
|
||||
}
|
||||
|
||||
public InetIPv6Util(final InetUtilsProperties properties) {
|
||||
this.properties = properties;
|
||||
this.executorService = Executors.newSingleThreadExecutor((r) -> {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setName("spring.cloud.alibaba.inetIPV6Util");
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
});
|
||||
}
|
||||
|
||||
public InetUtils.HostInfo findFirstNonLoopbackHostInfo() {
|
||||
InetAddress address = this.findFirstNonLoopbackIPv6Address();
|
||||
if (address != null) {
|
||||
return this.convertAddress(address);
|
||||
}
|
||||
else {
|
||||
InetUtils.HostInfo hostInfo = new InetUtils.HostInfo();
|
||||
this.properties.setDefaultIpAddress("0:0:0:0:0:0:0:1");
|
||||
hostInfo.setHostname(this.properties.getDefaultHostname());
|
||||
hostInfo.setIpAddress(this.properties.getDefaultIpAddress());
|
||||
return hostInfo;
|
||||
}
|
||||
}
|
||||
|
||||
public InetAddress findFirstNonLoopbackIPv6Address() {
|
||||
InetAddress address = null;
|
||||
|
||||
try {
|
||||
int lowest = Integer.MAX_VALUE;
|
||||
for (Enumeration<NetworkInterface> nics = NetworkInterface
|
||||
.getNetworkInterfaces(); nics.hasMoreElements(); ) {
|
||||
NetworkInterface ifc = nics.nextElement();
|
||||
if (ifc.isUp()) {
|
||||
log.trace("Testing interface:" + ifc.getDisplayName());
|
||||
if (ifc.getIndex() < lowest || address == null) {
|
||||
lowest = ifc.getIndex();
|
||||
}
|
||||
else if (address != null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!ignoreInterface(ifc.getDisplayName())) {
|
||||
for (Enumeration<InetAddress> addrs = ifc
|
||||
.getInetAddresses(); addrs.hasMoreElements(); ) {
|
||||
InetAddress inetAddress = addrs.nextElement();
|
||||
if (inetAddress instanceof Inet6Address
|
||||
&& !inetAddress.isLoopbackAddress()
|
||||
&& isPreferredAddress(inetAddress)) {
|
||||
log.trace("Found non-loopback interface: "
|
||||
+ ifc.getDisplayName());
|
||||
address = inetAddress;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Cannot get first non-loopback address", e);
|
||||
}
|
||||
|
||||
if (address != null) {
|
||||
return address;
|
||||
}
|
||||
|
||||
try {
|
||||
return InetAddress.getLocalHost();
|
||||
}
|
||||
catch (UnknownHostException e) {
|
||||
log.warn("Unable to retrieve localhost");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public String findIPv6Address() {
|
||||
String ip = findFirstNonLoopbackHostInfo().getIpAddress();
|
||||
int index = ip.indexOf('%');
|
||||
ip = index > 0 ? ip.substring(0, index) : ip;
|
||||
return iPv6Format(ip);
|
||||
}
|
||||
|
||||
public String iPv6Format(String ip) {
|
||||
return "[" + ip + "]";
|
||||
}
|
||||
|
||||
boolean isPreferredAddress(InetAddress address) {
|
||||
if (this.properties.isUseOnlySiteLocalInterfaces()) {
|
||||
final boolean siteLocalAddress = address.isSiteLocalAddress();
|
||||
if (!siteLocalAddress) {
|
||||
log.trace("Ignoring address" + address.getHostAddress());
|
||||
}
|
||||
return siteLocalAddress;
|
||||
}
|
||||
final List<String> preferredNetworks = this.properties.getPreferredNetworks();
|
||||
if (preferredNetworks.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
for (String regex : preferredNetworks) {
|
||||
final String hostAddress = address.getHostAddress();
|
||||
if (hostAddress.matches(regex) || hostAddress.startsWith(regex)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
log.trace("Ignoring address: " + address.getHostAddress());
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean ignoreInterface(String interfaceName) {
|
||||
for (String regex : this.properties.getIgnoredInterfaces()) {
|
||||
if (interfaceName.matches(regex)) {
|
||||
log.trace("Ignoring interface: " + interfaceName);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public InetUtils.HostInfo convertAddress(final InetAddress address) {
|
||||
InetUtils.HostInfo hostInfo = new InetUtils.HostInfo();
|
||||
Future<String> result = this.executorService.submit(address::getHostName);
|
||||
|
||||
String hostname;
|
||||
try {
|
||||
hostname = result.get(this.properties.getTimeoutSeconds(), TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.info("Cannot determine local hostname");
|
||||
hostname = "localhost";
|
||||
}
|
||||
hostInfo.setHostname(hostname);
|
||||
hostInfo.setIpAddress(address.getHostAddress());
|
||||
return hostInfo;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.utils;
|
||||
|
||||
import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled;
|
||||
import org.springframework.cloud.commons.util.InetUtilsProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author HH
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnDiscoveryEnabled
|
||||
@ConditionalOnNacosDiscoveryEnabled
|
||||
public class UtilIPv6AutoConfiguration {
|
||||
|
||||
public UtilIPv6AutoConfiguration() {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public InetIPv6Util inetIPv6Util(InetUtilsProperties properties) {
|
||||
return new InetIPv6Util(properties);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,149 @@
|
||||
/*
|
||||
* Copyright 2013-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.tests.nacos.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import com.alibaba.cloud.nacos.NacosConfigAutoConfiguration;
|
||||
import com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration;
|
||||
import com.alibaba.cloud.nacos.NacosConfigManager;
|
||||
import com.alibaba.cloud.nacos.NacosConfigProperties;
|
||||
import com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration;
|
||||
import com.alibaba.cloud.testsupport.SpringCloudAlibaba;
|
||||
import com.alibaba.cloud.testsupport.TestExtend;
|
||||
import com.alibaba.nacos.api.PropertyKeyConst;
|
||||
import com.alibaba.nacos.api.config.ConfigFactory;
|
||||
import com.alibaba.nacos.api.config.ConfigService;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import static com.alibaba.cloud.testsupport.Constant.TIME_OUT;
|
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
|
||||
|
||||
@SpringCloudAlibaba(composeFiles = "docker/nacos-compose-test.yml", serviceName = "nacos-standalone")
|
||||
@TestExtend(time = 4 * TIME_OUT)
|
||||
@SpringBootTest(classes = NacosConfigurationExtConfigTests.TestConfig.class, webEnvironment = NONE, properties = {
|
||||
"spring.application.name=myTestService1", "spring.profiles.active=dev,test",
|
||||
"spring.cloud.nacos.config.server-addr=127.0.0.1:8848",
|
||||
"spring.cloud.nacos.config.username=nacos",
|
||||
"spring.cloud.nacos.config.password=nacos",
|
||||
"spring.cloud.nacos.config.encode=utf-8",
|
||||
"spring.cloud.nacos.config.timeout=1000",
|
||||
"spring.cloud.nacos.config.file-extension=properties",
|
||||
"spring.cloud.nacos.config.extension-configs[0].data-id=ext-config-common01.properties",
|
||||
"spring.cloud.nacos.config.extension-configs[1].data-id=ext-config-common02.properties",
|
||||
"spring.cloud.nacos.config.extension-configs[1].group=GLOBAL_GROUP",
|
||||
"spring.cloud.nacos.config.shared-dataids=common1.properties,common2.properties",
|
||||
"spring.cloud.bootstrap.enabled=true" })
|
||||
public class NacosConfigurationExtConfigTests {
|
||||
|
||||
/**
|
||||
* nacos upload conf file.
|
||||
*/
|
||||
public static final String YAML_CONTENT = "configdata:\n" + " user:\n"
|
||||
+ " age: 22\n" + " name: freeman1123\n" + " map:\n"
|
||||
+ " hobbies:\n" + " - art\n" + " - programming\n"
|
||||
+ " - movie\n" + " intro: Hello, I'm freeman\n"
|
||||
+ " extra: yo~\n" + " users:\n" + " - name: dad\n"
|
||||
+ " age: 20\n" + " - name: mom\n" + " age: 18";
|
||||
|
||||
@Autowired
|
||||
private NacosConfigProperties nacosConfigProperties;
|
||||
|
||||
private ConfigService remoteService;
|
||||
|
||||
private NacosConfigManager nacosConfigManager;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUp() {
|
||||
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void prepare() throws NacosException {
|
||||
Properties nacosSettings = new Properties();
|
||||
String serverAddress = "127.0.0.1:8848";
|
||||
nacosSettings.put(PropertyKeyConst.SERVER_ADDR, serverAddress);
|
||||
nacosSettings.put(PropertyKeyConst.USERNAME, "nacos");
|
||||
nacosSettings.put(PropertyKeyConst.PASSWORD, "nacos");
|
||||
|
||||
remoteService = ConfigFactory.createConfigService(nacosSettings);
|
||||
nacosConfigManager = new NacosConfigManager(nacosConfigProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void contextLoads() throws NacosException {
|
||||
ConfigService localService = nacosConfigManager.getConfigService();
|
||||
updateConfig();
|
||||
String localContent = fetchConfig(localService, "nacos-config-refresh.yml",
|
||||
"DEFAULT_GROUP", TIME_OUT);
|
||||
String remoteContent = fetchConfig(remoteService, "nacos-config-refresh.yml",
|
||||
"DEFAULT_GROUP", TIME_OUT);
|
||||
Assertions.assertEquals(localContent, remoteContent);
|
||||
|
||||
List<NacosConfigProperties.Config> mockConfig = mockExtConfigs();
|
||||
|
||||
List<NacosConfigProperties.Config> extConfig = nacosConfigProperties
|
||||
.getExtensionConfigs();
|
||||
Assertions.assertArrayEquals(extConfig.toArray(), mockConfig.toArray());
|
||||
|
||||
}
|
||||
|
||||
private String fetchConfig(ConfigService configService, String dataId, String group,
|
||||
long timeoutMs) throws NacosException {
|
||||
return configService.getConfig(dataId, group, timeoutMs);
|
||||
}
|
||||
|
||||
private void updateConfig() throws NacosException {
|
||||
remoteService.publishConfig("nacos-config-refresh.yml", "DEFAULT_GROUP",
|
||||
YAML_CONTENT, "yaml");
|
||||
}
|
||||
|
||||
public static List<NacosConfigProperties.Config> mockExtConfigs() {
|
||||
List<NacosConfigProperties.Config> mockConfig = new ArrayList<>();
|
||||
NacosConfigProperties.Config config1 = new NacosConfigProperties.Config();
|
||||
config1.setDataId("ext-config-common01.properties");
|
||||
config1.setGroup("DEFAULT_GROUP");
|
||||
config1.setRefresh(false);
|
||||
NacosConfigProperties.Config config2 = new NacosConfigProperties.Config();
|
||||
config2.setDataId("ext-config-common02.properties");
|
||||
config2.setGroup("GLOBAL_GROUP");
|
||||
config2.setRefresh(false);
|
||||
mockConfig.add(config1);
|
||||
mockConfig.add(config2);
|
||||
return mockConfig;
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAutoConfiguration
|
||||
@ImportAutoConfiguration({ NacosConfigEndpointAutoConfiguration.class,
|
||||
NacosConfigAutoConfiguration.class, NacosConfigBootstrapConfiguration.class })
|
||||
public static class TestConfig {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<name>Rocketmq Test</name>
|
||||
|
||||
<modules>
|
||||
<module>rocketmq-stream-test</module>
|
||||
</modules>
|
||||
<parent>
|
||||
<artifactId>spring-cloud-alibaba-tests</artifactId>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<artifactId>rocketmq-tests</artifactId>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@ -0,0 +1,49 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>rocketmq-tests</artifactId>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>rocketmq-stream-test</artifactId>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<name>Rocketmq Stream Test</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-alibaba-test-support</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright 2013-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class RocketmqStreamApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(RocketmqStreamApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Copyright 2013-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.testsupport.SpringCloudAlibaba;
|
||||
import com.alibaba.cloud.testsupport.TestExtend;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import static com.alibaba.cloud.testsupport.Constant.TIME_OUT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
|
||||
|
||||
@SpringCloudAlibaba(composeFiles = "docker/rocket-compose-test.yml", serviceName = "rocketmq-standalone")
|
||||
@TestExtend(time = 6 * TIME_OUT)
|
||||
@SpringBootTest(classes = RocketMQAutoConfigurationTests.TestConfig.class, webEnvironment = NONE, properties = {
|
||||
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876,127.0.0.1:9877",
|
||||
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
|
||||
"spring.cloud.stream.bindings.output.content-type=application/json",
|
||||
|
||||
"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
|
||||
"spring.cloud.stream.bindings.input1.content-type=application/json",
|
||||
"spring.cloud.stream.bindings.input1.group=test-group1",
|
||||
"spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true",
|
||||
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
|
||||
"spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
|
||||
"spring.cloud.stream.bindings.input2.content-type=application/json",
|
||||
"spring.cloud.stream.bindings.input2.group=test-group2",
|
||||
"spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false",
|
||||
"spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tag1" })
|
||||
public class RocketMQAutoConfigurationTests {
|
||||
|
||||
@Autowired
|
||||
private RocketMQBinderConfigurationProperties binderConfigurationProperties;
|
||||
@Autowired
|
||||
private RocketMQExtendedBindingProperties bindingProperties;
|
||||
|
||||
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
|
||||
.withConfiguration(
|
||||
AutoConfigurations.of(RocketMQBinderAutoConfiguration.class));
|
||||
|
||||
@BeforeAll
|
||||
public static void setUp() {
|
||||
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void prepare() {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProperties() {
|
||||
this.contextRunner.run(context -> {
|
||||
assertThat(binderConfigurationProperties.getNameServer())
|
||||
.isEqualTo("127.0.0.1:9876,127.0.0.1:9877");
|
||||
assertThat(bindingProperties.getExtendedConsumerProperties("input2")
|
||||
.getSubscription()).isEqualTo("tag1");
|
||||
assertThat(bindingProperties.getExtendedConsumerProperties("input2").getPush()
|
||||
.getOrderly()).isFalse();
|
||||
assertThat(bindingProperties.getExtendedConsumerProperties("input1").getPush()
|
||||
.getOrderly()).isTrue();
|
||||
});
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAutoConfiguration
|
||||
@ImportAutoConfiguration({RocketMQBinderAutoConfiguration.class })
|
||||
public static class TestConfig {
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Copyright 2013-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.fixture.RocketmqBinderProcessor;
|
||||
import com.alibaba.cloud.testsupport.SpringCloudAlibaba;
|
||||
import com.alibaba.cloud.testsupport.TestExtend;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.test.binder.MessageCollector;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
|
||||
import static com.alibaba.cloud.testsupport.Constant.TIME_OUT;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
|
||||
import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
|
||||
|
||||
@SpringCloudAlibaba(composeFiles = "docker/rocket-compose-test.yml", serviceName = "rocketmq-standalone")
|
||||
@TestExtend(time = 6 * TIME_OUT)
|
||||
@DirtiesContext
|
||||
@ImportAutoConfiguration(value = {}, exclude = { DataSourceAutoConfiguration.class,
|
||||
TransactionAutoConfiguration.class,
|
||||
DataSourceTransactionManagerAutoConfiguration.class })
|
||||
@SpringBootTest(classes = RocketmqBinderProcessor.class, webEnvironment = NONE, properties = {
|
||||
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876,127.0.0.1:9877",
|
||||
"spring.cloud.stream.rocketmq.binder.group=flaky-group",
|
||||
// "spring.cloud.stream.rocketmq.binder.consumer-group=flaky-group",
|
||||
// "spring.cloud.stream.pollable-source=pollable",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-out-0.destination=TopicOrderTest",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-out-0.content-type=application/json",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-out-0.group=test-group1",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-in-0.destination=TopicOrderTest",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-in-0.content-type=application/json",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-in-0.group=test-group1",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-in-0.consumer.push.orderly=true",
|
||||
"spring.cloud.stream.bindings.uppercaseFunction-in-0.consumer.maxAttempts=1" })
|
||||
public class RocketmqProduceAndConsumerTests {
|
||||
|
||||
@Autowired
|
||||
private MessageCollector collector;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("uppercaseFunction-in-0")
|
||||
private MessageChannel input1;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("uppercaseFunction-out-0")
|
||||
private MessageChannel output;
|
||||
|
||||
@BeforeAll
|
||||
public static void prepare() {
|
||||
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
String key = "KEY";
|
||||
String messageId = "1";
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put(MessageConst.PROPERTY_KEYS, key);
|
||||
headers.put(MessageConst.PROPERTY_TAGS, "TagA");
|
||||
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, messageId);
|
||||
Message<String> msg = new GenericMessage(JSON.toJSONString("Hello RocketMQ"),
|
||||
headers);
|
||||
input1.send(msg);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumeAndProduce() throws Exception {
|
||||
BlockingQueue<Message<?>> messages = this.collector.forChannel(this.output);
|
||||
|
||||
MatcherAssert.assertThat(messages, receivesPayloadThat(is("\"HELLO ROCKETMQ\"")));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2013-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.fixture;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
@SpringBootApplication
|
||||
public class RocketmqBinderProcessor {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(RocketmqBinderProcessor.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<String, String> uppercaseFunction() {
|
||||
return String::toUpperCase;
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
brokerClusterName = DefaultCluster
|
||||
brokerName = broker-a
|
||||
brokerId = 0
|
||||
deleteWhen = 04
|
||||
fileReservedTime = 48
|
||||
brokerRole = ASYNC_MASTER
|
||||
flushDiskType = ASYNC_FLUSH
|
@ -0,0 +1,7 @@
|
||||
brokerClusterName = DefaultCluster
|
||||
brokerName = broker-b
|
||||
brokerId = 0
|
||||
deleteWhen = 04
|
||||
fileReservedTime = 48
|
||||
brokerRole = ASYNC_MASTER
|
||||
flushDiskType = ASYNC_FLUSH
|
@ -0,0 +1,46 @@
|
||||
services:
|
||||
#Service for nameserver
|
||||
namesrv:
|
||||
image: apache/rocketmq:4.9.3
|
||||
container_name: rmqnamesrv
|
||||
ports:
|
||||
- 9876:9876
|
||||
volumes:
|
||||
- ./data/namesrv/logs:/home/rocketmq/logs
|
||||
command: sh mqnamesrv
|
||||
|
||||
#Service for broker
|
||||
broker:
|
||||
image: apache/rocketmq:4.9.3
|
||||
container_name: rmqbroker
|
||||
links:
|
||||
- namesrv
|
||||
ports:
|
||||
- 10909:10909
|
||||
- 10911:10911
|
||||
- 10912:10912
|
||||
environment:
|
||||
- NAMESRV_ADDR=namesrv:9876
|
||||
volumes:
|
||||
- ./data/broker/logs:/home/rocketmq/logs
|
||||
- ./data/broker/store:/home/rocketmq/store
|
||||
- ./data/broker/conf/broker.conf:/opt/rocketmq-4.9.3/conf/broker.conf
|
||||
command: sh mqbroker -c /opt/rocketmq-4.9.3/conf/broker.conf
|
||||
|
||||
#Service for another broker -- broker1
|
||||
broker1:
|
||||
image: apache/rocketmq:4.9.3
|
||||
container_name: rmqbroker-b
|
||||
links:
|
||||
- namesrv
|
||||
ports:
|
||||
- 10929:10909
|
||||
- 10931:10911
|
||||
- 10932:10912
|
||||
environment:
|
||||
- NAMESRV_ADDR=namesrv:9876
|
||||
volumes:
|
||||
- ./data1/broker/logs:/home/rocketmq/logs
|
||||
- ./data1/broker/store:/home/rocketmq/store
|
||||
- ./data1/broker/conf/broker.conf:/opt/rocketmq-4.9.3/conf/broker.conf
|
||||
command: sh mqbroker -c /opt/rocketmq-4.9.3/conf/broker.conf
|
Loading…
Reference in New Issue