From d79207c64b65680e45c399cb31f19e8b176e5047 Mon Sep 17 00:00:00 2001 From: yuhuangbin Date: Wed, 4 Dec 2019 09:23:34 +0800 Subject: [PATCH] Support seata feign loadbalancer --- spring-cloud-alibaba-seata/pom.xml | 6 ++ .../SeataFeignBlockingLoadBalancerClient.java | 95 +++++++++++++++++++ .../seata/feign/SeataFeignObjectWrapper.java | 39 ++++++++ 3 files changed, 140 insertions(+) create mode 100644 spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignBlockingLoadBalancerClient.java diff --git a/spring-cloud-alibaba-seata/pom.xml b/spring-cloud-alibaba-seata/pom.xml index 5f4a2802d..216008cc0 100644 --- a/spring-cloud-alibaba-seata/pom.xml +++ b/spring-cloud-alibaba-seata/pom.xml @@ -38,6 +38,12 @@ true + + org.springframework.cloud + spring-cloud-loadbalancer + true + + org.springframework.cloud spring-cloud-starter-netflix-ribbon diff --git a/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignBlockingLoadBalancerClient.java b/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignBlockingLoadBalancerClient.java new file mode 100644 index 000000000..3ad4a09ae --- /dev/null +++ b/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignBlockingLoadBalancerClient.java @@ -0,0 +1,95 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.seata.feign; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import feign.Client; +import feign.Request; +import feign.Response; +import io.seata.core.context.RootContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient; +import org.springframework.http.HttpStatus; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author yuhuangbin + */ +public class SeataFeignBlockingLoadBalancerClient implements Client { + + private static final Log LOG = LogFactory + .getLog(SeataFeignBlockingLoadBalancerClient.class); + + private final Client delegate; + + private final BlockingLoadBalancerClient loadBalancerClient; + + SeataFeignBlockingLoadBalancerClient(Client delegate, + BlockingLoadBalancerClient loadBalancerClient) { + this.delegate = delegate; + this.loadBalancerClient = loadBalancerClient; + } + + @Override + public Response execute(Request request, Request.Options options) throws IOException { + final URI originalUri = URI.create(request.url()); + String serviceId = originalUri.getHost(); + Assert.state(serviceId != null, + "Request URI does not contain a valid hostname: " + originalUri); + ServiceInstance instance = loadBalancerClient.choose(serviceId); + if (instance == null) { + String message = "Load balancer does not contain an instance for the service " + + serviceId; + if (LOG.isWarnEnabled()) { + LOG.warn(message); + } + return Response.builder().request(request) + .status(HttpStatus.SERVICE_UNAVAILABLE.value()) + .body(message, StandardCharsets.UTF_8).build(); + } + String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri) + .toString(); + Request newRequest = Request.create(request.httpMethod(), reconstructedUrl, + enrichRequstHeader(request.headers()), request.requestBody()); + + return delegate.execute(newRequest, options); + } + + private Map> enrichRequstHeader( + Map> headers) { + String xid = RootContext.getXID(); + if (!StringUtils.isEmpty(xid)) { + Map> newHeaders = new HashMap<>(); + newHeaders.putAll(headers); + newHeaders.put(RootContext.KEY_XID, Arrays.asList(xid)); + return newHeaders; + } + return headers; + } + +} diff --git a/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignObjectWrapper.java b/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignObjectWrapper.java index 95c2a3f8e..58b9ef2c7 100644 --- a/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignObjectWrapper.java +++ b/spring-cloud-alibaba-seata/src/main/java/com/alibaba/cloud/seata/feign/SeataFeignObjectWrapper.java @@ -16,9 +16,14 @@ package com.alibaba.cloud.seata.feign; +import java.lang.reflect.Field; + import feign.Client; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanFactory; +import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient; import org.springframework.cloud.netflix.ribbon.SpringClientFactory; import org.springframework.cloud.openfeign.ribbon.CachingSpringLoadBalancerFactory; import org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient; @@ -28,12 +33,16 @@ import org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient; */ public class SeataFeignObjectWrapper { + private static final Log LOG = LogFactory.getLog(SeataFeignObjectWrapper.class); + private final BeanFactory beanFactory; private CachingSpringLoadBalancerFactory cachingSpringLoadBalancerFactory; private SpringClientFactory springClientFactory; + private BlockingLoadBalancerClient loadBalancerClient; + SeataFeignObjectWrapper(BeanFactory beanFactory) { this.beanFactory = beanFactory; } @@ -45,11 +54,41 @@ public class SeataFeignObjectWrapper { return new SeataLoadBalancerFeignClient(client.getDelegate(), factory(), clientFactory(), this.beanFactory); } + if (bean.getClass().getName().equals( + "org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient")) { + return new SeataFeignBlockingLoadBalancerClient(getClient(bean), + loadBalancerClient()); + } return new SeataFeignClient(this.beanFactory, (Client) bean); } return bean; } + private Client getClient(Object bean) { + Field client = null; + boolean oldAccessible = false; + try { + client = bean.getClass().getDeclaredField("delegate"); + oldAccessible = client.isAccessible(); + client.setAccessible(true); + return (Client) client.get(bean); + } + catch (Exception e) { + LOG.error("get delegate client error", e); + } + finally { + client.setAccessible(oldAccessible); + } + return null; + } + + private BlockingLoadBalancerClient loadBalancerClient() { + if (this.loadBalancerClient != null) { + return this.loadBalancerClient; + } + return beanFactory.getBean(BlockingLoadBalancerClient.class); + } + CachingSpringLoadBalancerFactory factory() { if (this.cachingSpringLoadBalancerFactory == null) { this.cachingSpringLoadBalancerFactory = this.beanFactory