mirror of https://github.com/alibaba/arthas.git
tunnel server/client support http proxy. #1553
parent
79c27ad13c
commit
32d4321a99
@ -0,0 +1,157 @@
|
||||
package com.alibaba.arthas.tunnel.client;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.alibaba.arthas.tunnel.common.SimpleHttpResponse;
|
||||
import com.taobao.arthas.common.ArthasConstants;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpClientCodec;
|
||||
import io.netty.handler.codec.http.HttpContent;
|
||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||
import io.netty.handler.codec.http.HttpHeaderValues;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import io.netty.handler.codec.http.HttpObject;
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.netty.handler.codec.http.HttpVersion;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author hengyunabc 2020-10-22
|
||||
*
|
||||
*/
|
||||
public class ProxyClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProxyClient.class);
|
||||
|
||||
public SimpleHttpResponse query(String targetUrl) throws InterruptedException {
|
||||
final Promise<SimpleHttpResponse> httpResponsePromise = GlobalEventExecutor.INSTANCE.newPromise();
|
||||
|
||||
final EventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("arthas-ProxyClient", true));
|
||||
ChannelFuture closeFuture = null;
|
||||
try {
|
||||
Bootstrap b = new Bootstrap();
|
||||
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
|
||||
b.group(group).channel(LocalChannel.class).handler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
protected void initChannel(LocalChannel ch) {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH),
|
||||
new HttpProxyClientHandler(httpResponsePromise));
|
||||
}
|
||||
});
|
||||
|
||||
LocalAddress localAddress = new LocalAddress(ArthasConstants.NETTY_LOCAL_ADDRESS);
|
||||
Channel localChannel = b.connect(localAddress).sync().channel();
|
||||
|
||||
// Prepare the HTTP request.
|
||||
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, targetUrl,
|
||||
Unpooled.EMPTY_BUFFER);
|
||||
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
|
||||
|
||||
localChannel.writeAndFlush(request);
|
||||
|
||||
closeFuture = localChannel.closeFuture();
|
||||
logger.info("proxy client connect to server success, targetUrl: " + targetUrl);
|
||||
|
||||
return httpResponsePromise.get(5000, TimeUnit.MILLISECONDS);
|
||||
} catch (Throwable e) {
|
||||
logger.error("ProxyClient error, targetUrl: {}", targetUrl, e);
|
||||
} finally {
|
||||
if (closeFuture != null) {
|
||||
closeFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
SimpleHttpResponse httpResponse = new SimpleHttpResponse();
|
||||
try {
|
||||
httpResponse.setContent(new String("error").getBytes("utf-8"));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
// ignore
|
||||
}
|
||||
return httpResponse;
|
||||
}
|
||||
|
||||
class HttpProxyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
|
||||
|
||||
private Promise<SimpleHttpResponse> promise;
|
||||
|
||||
private SimpleHttpResponse simpleHttpResponse = new SimpleHttpResponse();
|
||||
|
||||
public HttpProxyClientHandler(Promise<SimpleHttpResponse> promise) {
|
||||
this.promise = promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
|
||||
if (msg instanceof HttpResponse) {
|
||||
HttpResponse response = (HttpResponse) msg;
|
||||
|
||||
simpleHttpResponse.setStatus(response.status().code());
|
||||
if (!response.headers().isEmpty()) {
|
||||
for (String name : response.headers().names()) {
|
||||
for (String value : response.headers().getAll(name)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("header: {}, value: {}", name, value);
|
||||
}
|
||||
|
||||
simpleHttpResponse.addHeader(name, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (msg instanceof HttpContent) {
|
||||
HttpContent content = (HttpContent) msg;
|
||||
|
||||
ByteBuf byteBuf = content.content();
|
||||
byte[] bytes = new byte[byteBuf.readableBytes()];
|
||||
byteBuf.readBytes(bytes);
|
||||
|
||||
simpleHttpResponse.setContent(bytes);
|
||||
|
||||
promise.setSuccess(simpleHttpResponse);
|
||||
|
||||
if (content instanceof LastHttpContent) {
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
logger.error("Proxy Client error", cause);
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
package com.alibaba.arthas.tunnel.common;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author hengyunabc 2020-10-22
|
||||
*
|
||||
*/
|
||||
public class SimpleHttpResponse implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private int status = 200;
|
||||
|
||||
private Map<String, String> headers = new HashMap<String, String>();
|
||||
|
||||
private byte[] content;
|
||||
|
||||
public void addHeader(String key, String value) {
|
||||
headers.put(key, value);
|
||||
}
|
||||
|
||||
public Map<String, String> getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
|
||||
public void setHeaders(Map<String, String> headers) {
|
||||
this.headers = headers;
|
||||
}
|
||||
|
||||
public byte[] getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public void setContent(byte[] content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
public int getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(int status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public static byte[] toBytes(SimpleHttpResponse response) throws IOException {
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
ObjectOutputStream out = null;
|
||||
try {
|
||||
out = new ObjectOutputStream(bos);
|
||||
out.writeObject(response);
|
||||
out.flush();
|
||||
return bos.toByteArray();
|
||||
} finally {
|
||||
try {
|
||||
bos.close();
|
||||
} catch (IOException ex) {
|
||||
// ignore close exception
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static SimpleHttpResponse fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
|
||||
ObjectInput in = null;
|
||||
try {
|
||||
in = new ObjectInputStream(bis);
|
||||
return (SimpleHttpResponse) in.readObject();
|
||||
} finally {
|
||||
try {
|
||||
if (in != null) {
|
||||
in.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
// ignore close exception
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
package com.alibaba.arthas.tunnel.server.app.web;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.ResponseEntity.BodyBuilder;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.servlet.HandlerMapping;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
import com.alibaba.arthas.tunnel.common.MethodConstants;
|
||||
import com.alibaba.arthas.tunnel.common.SimpleHttpResponse;
|
||||
import com.alibaba.arthas.tunnel.common.URIConstans;
|
||||
import com.alibaba.arthas.tunnel.server.AgentInfo;
|
||||
import com.alibaba.arthas.tunnel.server.TunnelServer;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
/**
|
||||
* 代理http请求到具体的 arthas agent里
|
||||
*
|
||||
* @author hengyunabc 2020-10-22
|
||||
*
|
||||
*/
|
||||
@Controller
|
||||
public class ProxyController {
|
||||
private final static Logger logger = LoggerFactory.getLogger(ProxyController.class);
|
||||
|
||||
@Autowired
|
||||
TunnelServer tunnelServer;
|
||||
|
||||
@RequestMapping(value = "/proxy/{agentId}/**")
|
||||
@ResponseBody
|
||||
public ResponseEntity<?> execute(@PathVariable(name = "agentId", required = true) String agentId,
|
||||
HttpServletRequest request) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
|
||||
String fullPath = (String) request.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE);
|
||||
String targetUrl = fullPath.substring("/proxy/".length() + agentId.length());
|
||||
|
||||
logger.info("http proxy, agentId: {}, targetUrl: {}", agentId, targetUrl);
|
||||
|
||||
Optional<AgentInfo> findAgent = tunnelServer.findAgent(agentId);
|
||||
|
||||
if (findAgent.isPresent()) {
|
||||
String requestId = RandomStringUtils.random(20, true, true).toUpperCase();
|
||||
|
||||
ChannelHandlerContext agentCtx = findAgent.get().getChannelHandlerContext();
|
||||
|
||||
Promise<SimpleHttpResponse> httpResponsePromise = GlobalEventExecutor.INSTANCE.newPromise();
|
||||
|
||||
tunnelServer.addProxyRequestPromise(requestId, httpResponsePromise);
|
||||
|
||||
URI uri = UriComponentsBuilder.newInstance().scheme(URIConstans.RESPONSE).path("/")
|
||||
.queryParam(URIConstans.METHOD, MethodConstants.HTTP_PROXY).queryParam(URIConstans.ID, agentId)
|
||||
.queryParam(URIConstans.TARGET_URL, targetUrl).queryParam(URIConstans.PROXY_REQUEST_ID, requestId)
|
||||
.build().toUri();
|
||||
|
||||
agentCtx.channel().writeAndFlush(new TextWebSocketFrame(uri.toString()));
|
||||
logger.info("waitting for arthas agent http proxy, agentId: {}, targetUrl: {}", agentId, targetUrl);
|
||||
|
||||
SimpleHttpResponse simpleHttpResponse = httpResponsePromise.get(15, TimeUnit.SECONDS);
|
||||
|
||||
BodyBuilder bodyBuilder = ResponseEntity.status(simpleHttpResponse.getStatus());
|
||||
for (Entry<String, String> entry : simpleHttpResponse.getHeaders().entrySet()) {
|
||||
bodyBuilder.header(entry.getKey(), entry.getValue());
|
||||
}
|
||||
ResponseEntity<byte[]> responseEntity = bodyBuilder.body(simpleHttpResponse.getContent());
|
||||
return responseEntity;
|
||||
} else {
|
||||
logger.error("can not find agent by agentId: {}", agentId);
|
||||
}
|
||||
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
package com.alibaba.arthas.tunnel.server;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
import com.alibaba.arthas.tunnel.common.MethodConstants;
|
||||
import com.alibaba.arthas.tunnel.common.URIConstans;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author hengyunabc 2020-10-22
|
||||
*
|
||||
*/
|
||||
public class URITest {
|
||||
@Test
|
||||
public void test() throws URISyntaxException {
|
||||
String id = "xxx";
|
||||
URI responseUri = new URI("response", null, "/", "method=" + MethodConstants.AGENT_REGISTER + "&id=" + id,
|
||||
null);
|
||||
|
||||
String string = responseUri.toString();
|
||||
|
||||
String uriString = UriComponentsBuilder.newInstance().scheme("response").path("/")
|
||||
.queryParam("method", MethodConstants.AGENT_REGISTER).queryParam("id", id).build().toUriString();
|
||||
|
||||
Assertions.assertThat(string).isEqualTo(uriString).isEqualTo("response:/?method=agentRegister&id=xxx");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncode() throws URISyntaxException {
|
||||
String id = "xxx/%ff#ff";
|
||||
URI responseUri = new URI("response", null, "/", "method=" + MethodConstants.AGENT_REGISTER + "&id=" + id,
|
||||
null);
|
||||
|
||||
String string = responseUri.toString();
|
||||
|
||||
String uriString = UriComponentsBuilder.newInstance().scheme(URIConstans.RESPONSE).path("/")
|
||||
.queryParam(URIConstans.METHOD, MethodConstants.AGENT_REGISTER).queryParam(URIConstans.ID, id).build()
|
||||
.encode().toUriString();
|
||||
|
||||
Assertions.assertThat(string).isEqualTo(uriString)
|
||||
.isEqualTo("response:/?method=agentRegister&id=xxx/%25ff%23ff");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test3() throws URISyntaxException {
|
||||
|
||||
String agentId = "ffff";
|
||||
String clientConnectionId = "ccccc";
|
||||
URI uri = new URI("response", null, "/", "method=" + MethodConstants.START_TUNNEL + "&id=" + agentId
|
||||
+ "&clientConnectionId=" + clientConnectionId, null);
|
||||
|
||||
String string = uri.toString();
|
||||
|
||||
String uriString = UriComponentsBuilder.newInstance().scheme(URIConstans.RESPONSE).path("/")
|
||||
.queryParam(URIConstans.METHOD, MethodConstants.START_TUNNEL).queryParam(URIConstans.ID, agentId)
|
||||
.queryParam(URIConstans.CLIENT_CONNECTION_ID, clientConnectionId).build().toUriString();
|
||||
|
||||
System.err.println(string);
|
||||
Assertions.assertThat(string).isEqualTo(uriString)
|
||||
.isEqualTo("response:/?method=startTunnel&id=ffff&clientConnectionId=ccccc");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue