feature: Nacos failure tolerance.

增加 nacos 失败容错功能, 在查询失败时使用缓存值
pull/2360/head
Freeman Lau 3 years ago
parent 60f7fd87a3
commit 9a4a7d6f42

@ -18,10 +18,12 @@ package com.alibaba.cloud.nacos.discovery;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
@ -29,6 +31,7 @@ import org.springframework.cloud.client.discovery.DiscoveryClient;
* @author xiaojing * @author xiaojing
* @author renhaojun * @author renhaojun
* @author echooymxq * @author echooymxq
* @author freeman
*/ */
public class NacosDiscoveryClient implements DiscoveryClient { public class NacosDiscoveryClient implements DiscoveryClient {
@ -41,6 +44,9 @@ public class NacosDiscoveryClient implements DiscoveryClient {
private NacosServiceDiscovery serviceDiscovery; private NacosServiceDiscovery serviceDiscovery;
@Value("${spring.cloud.nacos.discovery.failure-tolerance-enabled:true}")
private boolean failureToleranceEnabled = true;
public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
this.serviceDiscovery = nacosServiceDiscovery; this.serviceDiscovery = nacosServiceDiscovery;
} }
@ -53,9 +59,15 @@ public class NacosDiscoveryClient implements DiscoveryClient {
@Override @Override
public List<ServiceInstance> getInstances(String serviceId) { public List<ServiceInstance> getInstances(String serviceId) {
try { try {
return serviceDiscovery.getInstances(serviceId); return Optional.of(serviceDiscovery.getInstances(serviceId)).map(instances -> {
ServiceCache.setInstances(serviceId, instances);
return instances;
}).get();
} }
catch (Exception e) { catch (Exception e) {
if (failureToleranceEnabled) {
return ServiceCache.getInstances(serviceId);
}
throw new RuntimeException( throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e); "Can not get hosts from nacos server. serviceId: " + serviceId, e);
} }
@ -64,11 +76,14 @@ public class NacosDiscoveryClient implements DiscoveryClient {
@Override @Override
public List<String> getServices() { public List<String> getServices() {
try { try {
return serviceDiscovery.getServices(); return Optional.of(serviceDiscovery.getServices()).map(services -> {
ServiceCache.set(services);
return services;
}).get();
} }
catch (Exception e) { catch (Exception e) {
log.error("get service name from nacos server fail,", e); log.error("get service name from nacos server fail,", e);
return Collections.emptyList(); return failureToleranceEnabled ? ServiceCache.get() : Collections.emptyList();
} }
} }

@ -0,0 +1,41 @@
package com.alibaba.cloud.nacos.discovery;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.cloud.client.ServiceInstance;
/**
* Service cache.
* <p>
* Cache serviceIds and corresponding instances in Nacos.
*
* @author freeman
* @since 2022.0
*/
public class ServiceCache {
private static List<String> services = Collections.emptyList();
private static Map<String, List<ServiceInstance>> instancesMap = new ConcurrentHashMap<>();
public static void setInstances(String serviceId, List<ServiceInstance> instances) {
instancesMap.put(serviceId, Collections.unmodifiableList(instances));
}
public static List<ServiceInstance> getInstances(String serviceId) {
return Optional.ofNullable(instancesMap.get(serviceId)).orElse(Collections.emptyList());
}
public static void set(List<String> newServices) {
services = Collections.unmodifiableList(newServices);
}
public static List<String> get() {
return services;
}
}

@ -19,10 +19,12 @@ package com.alibaba.cloud.nacos.discovery.reactive;
import java.util.function.Function; import java.util.function.Function;
import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery; import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.cloud.nacos.discovery.ServiceCache;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -32,6 +34,7 @@ import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
/** /**
* @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a> * @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a>
* @author freeman
**/ **/
public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient { public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {
@ -40,6 +43,9 @@ public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {
private NacosServiceDiscovery serviceDiscovery; private NacosServiceDiscovery serviceDiscovery;
@Value("${spring.cloud.nacos.discovery.failure-tolerance-enabled:true}")
private boolean failureToleranceEnabled = true;
public NacosReactiveDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { public NacosReactiveDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
this.serviceDiscovery = nacosServiceDiscovery; this.serviceDiscovery = nacosServiceDiscovery;
} }
@ -59,11 +65,17 @@ public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {
private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() { private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {
return serviceId -> { return serviceId -> {
try { try {
return Flux.fromIterable(serviceDiscovery.getInstances(serviceId)); return Mono.justOrEmpty(serviceDiscovery.getInstances(serviceId))
.flatMapMany(instances -> {
ServiceCache.setInstances(serviceId, instances);
return Flux.fromIterable(instances);
});
} }
catch (NacosException e) { catch (NacosException e) {
log.error("get service instance[{}] from nacos error!", serviceId, e); log.error("get service instance[{}] from nacos error!", serviceId, e);
return Flux.empty(); return failureToleranceEnabled
? Flux.fromIterable(ServiceCache.getInstances(serviceId))
: Flux.empty();
} }
}; };
} }
@ -72,11 +84,17 @@ public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {
public Flux<String> getServices() { public Flux<String> getServices() {
return Flux.defer(() -> { return Flux.defer(() -> {
try { try {
return Flux.fromIterable(serviceDiscovery.getServices()); return Mono.justOrEmpty(serviceDiscovery.getServices())
.flatMapMany(services -> {
ServiceCache.set(services);
return Flux.fromIterable(services);
});
} }
catch (Exception e) { catch (Exception e) {
log.error("get services from nacos server fail,", e); log.error("get services from nacos server fail,", e);
return Flux.empty(); return failureToleranceEnabled
? Flux.fromIterable(ServiceCache.get())
: Flux.empty();
} }
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(Schedulers.boundedElastic());
} }

@ -74,5 +74,11 @@
"type": "java.lang.Boolean", "type": "java.lang.Boolean",
"defaultValue": false, "defaultValue": false,
"description": "Integrate LoadBalancer or not." "description": "Integrate LoadBalancer or not."
},
{
"name": "spring.cloud.nacos.discovery.failure-tolerance-enabled",
"type": "java.lang.Boolean",
"defaultValue": true,
"description": "Whether to enable nacos failure tolerance. If enabled, nacos will return cached values when exceptions occur."
} }
]} ]}

@ -16,10 +16,13 @@
package com.alibaba.cloud.nacos; package com.alibaba.cloud.nacos;
import java.util.Arrays;
import java.util.List; import java.util.List;
import com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient; import com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient;
import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery; import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.cloud.nacos.discovery.ServiceCache;
import com.alibaba.nacos.api.exception.NacosException;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
@ -27,14 +30,18 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.test.util.ReflectionTestUtils;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/** /**
* @author xiaojing * @author xiaojing
* @author echooymxq * @author echooymxq
* @author freeman
*/ */
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class NacosDiscoveryClientTests { public class NacosDiscoveryClientTests {
@ -71,4 +78,56 @@ public class NacosDiscoveryClientTests {
} }
@Test
public void testGetInstancesFailureToleranceEnabled() throws NacosException {
ServiceCache.setInstances("a", singletonList(serviceInstance));
when(serviceDiscovery.getInstances("a")).thenThrow(new NacosException());
List<ServiceInstance> instances = this.client.getInstances("a");
assertThat(instances).isEqualTo(singletonList(serviceInstance));
}
@Test
public void testGetInstancesFailureToleranceDisabled() throws NacosException {
ServiceCache.setInstances("a", singletonList(serviceInstance));
when(serviceDiscovery.getInstances("a")).thenThrow(new NacosException());
ReflectionTestUtils.setField(client, "failureToleranceEnabled", false);
assertThatThrownBy(() -> this.client.getInstances("a"));
}
@Test
public void testFailureToleranceEnabled() throws NacosException {
ServiceCache.set(Arrays.asList("a", "b"));
when(serviceDiscovery.getServices()).thenThrow(new NacosException());
List<String> services = this.client.getServices();
assertThat(services).isEqualTo(Arrays.asList("a", "b"));
}
@Test
public void testFailureToleranceDisabled() throws NacosException {
ServiceCache.set(Arrays.asList("a", "b"));
when(serviceDiscovery.getServices()).thenThrow(new NacosException());
ReflectionTestUtils.setField(client, "failureToleranceEnabled", false);
List<String> services = this.client.getServices();
assertThat(services).isEqualTo(emptyList());
}
@Test
public void testCacheIsOK() throws NacosException {
when(serviceDiscovery.getInstances("a"))
.thenReturn(singletonList(serviceInstance));
this.client.getInstances("a");
assertThat(ServiceCache.getInstances("a")).isEqualTo(singletonList(serviceInstance));
}
} }

@ -19,6 +19,7 @@ package com.alibaba.cloud.nacos.discovery.reactive;
import java.util.Arrays; import java.util.Arrays;
import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery; import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.cloud.nacos.discovery.ServiceCache;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -29,12 +30,14 @@ import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.test.util.ReflectionTestUtils;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/** /**
* @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a> * @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a>
* @author freeman
**/ **/
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class NacosReactiveDiscoveryClientTests { class NacosReactiveDiscoveryClientTests {
@ -71,4 +74,69 @@ class NacosReactiveDiscoveryClientTests {
.expectComplete().verify(); .expectComplete().verify();
} }
@Test
public void testGetInstancesFailureToleranceEnabled() throws NacosException {
ServiceCache.setInstances("a", singletonList(serviceInstance));
when(serviceDiscovery.getInstances("a")).thenThrow(new NacosException());
Flux<ServiceInstance> instances = this.client.getInstances("a");
StepVerifier.create(instances).expectNext(serviceInstance)
.expectComplete().verify();
}
@Test
public void testGetInstancesFailureToleranceDisabled() throws NacosException {
ServiceCache.setInstances("a", singletonList(serviceInstance));
when(serviceDiscovery.getInstances("a")).thenThrow(new NacosException());
ReflectionTestUtils.setField(client, "failureToleranceEnabled", false);
Flux<ServiceInstance> instances = this.client.getInstances("a");
StepVerifier.create(instances).expectComplete().verify();
}
@Test
public void testFailureToleranceEnabled() throws NacosException {
ServiceCache.set(Arrays.asList("a", "b"));
when(serviceDiscovery.getServices()).thenThrow(new NacosException());
Flux<String> services = this.client.getServices();
StepVerifier.create(services).expectNext("a", "b")
.expectComplete().verify();
}
@Test
public void testFailureToleranceDisabled() throws NacosException {
ServiceCache.set(Arrays.asList("a", "b"));
when(serviceDiscovery.getServices()).thenThrow(new NacosException());
ReflectionTestUtils.setField(client, "failureToleranceEnabled", false);
Flux<String> services = this.client.getServices();
StepVerifier.create(services).expectComplete().verify();
}
@Test
public void testCacheIsOK() throws NacosException, InterruptedException {
when(serviceDiscovery.getInstances("a"))
.thenReturn(singletonList(serviceInstance));
Flux<ServiceInstance> instances = this.client.getInstances("a");
instances = instances.doOnComplete(() -> {
if (!ServiceCache.getInstances("a").equals(singletonList(serviceInstance))) {
throw new RuntimeException();
}
});
StepVerifier.create(instances)
.expectNext(serviceInstance)
.expectComplete().verify();
}
} }

Loading…
Cancel
Save