diff --git a/common/src/main/java/com/taobao/arthas/common/ArthasConstants.java b/common/src/main/java/com/taobao/arthas/common/ArthasConstants.java index 2b0fc144f..6efcd453d 100644 --- a/common/src/main/java/com/taobao/arthas/common/ArthasConstants.java +++ b/common/src/main/java/com/taobao/arthas/common/ArthasConstants.java @@ -13,4 +13,8 @@ public class ArthasConstants { * @see io.netty.channel.local.LocalChannel */ public static final String NETTY_LOCAL_ADDRESS = "arthas-netty-LocalAddress"; + + public static int MAX_HTTP_CONTENT_LENGTH = 1024 * 1024 * 8; + + public static final String ARTHAS_OUTPUT = "arthas-output"; } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/LocalTtyServerInitializer.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/LocalTtyServerInitializer.java index 3ba279fb1..a2f2eb974 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/LocalTtyServerInitializer.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/LocalTtyServerInitializer.java @@ -14,6 +14,8 @@ import io.termd.core.tty.TtyConnection; import java.io.File; +import com.taobao.arthas.common.ArthasConstants; + /** * * @author hengyunabc 2020-09-02 @@ -38,8 +40,8 @@ public class LocalTtyServerInitializer extends ChannelInitializer ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); - pipeline.addLast(new HttpObjectAggregator(64 * 1024)); - pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File("arthas-output"))); + pipeline.addLast(new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH)); + pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File(ArthasConstants.ARTHAS_OUTPUT))); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TtyWebSocketFrameHandler(group, handler)); } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/TtyServerInitializer.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/TtyServerInitializer.java index 57ba3c9a6..b2b419d1d 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/TtyServerInitializer.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/TtyServerInitializer.java @@ -14,6 +14,8 @@ import io.termd.core.tty.TtyConnection; import java.io.File; +import com.taobao.arthas.common.ArthasConstants; + /** * @author Julien Viet @@ -36,8 +38,8 @@ public class TtyServerInitializer extends ChannelInitializer { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); - pipeline.addLast(new HttpObjectAggregator(64 * 1024)); - pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File("arthas-output"))); + pipeline.addLast(new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH)); + pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File(ArthasConstants.ARTHAS_OUTPUT))); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TtyWebSocketFrameHandler(group, handler)); } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java index 8113ef9fc..81892b7a7 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java @@ -3,6 +3,7 @@ package com.taobao.arthas.core.shell.term.impl.httptelnet; import java.io.File; import java.util.concurrent.TimeUnit; +import com.taobao.arthas.common.ArthasConstants; import com.taobao.arthas.core.shell.term.impl.http.HttpRequestHandler; import com.taobao.arthas.core.shell.term.impl.http.TtyWebSocketFrameHandler; @@ -84,8 +85,8 @@ public class ProtocolDetectHandler extends ChannelInboundHandlerAdapter { } else { pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); - pipeline.addLast(new HttpObjectAggregator(64 * 1024)); - pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File("arthas-output"))); + pipeline.addLast(new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH)); + pipeline.addLast(workerGroup, "HttpRequestHandler", new HttpRequestHandler("/ws", new File(ArthasConstants.ARTHAS_OUTPUT))); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TtyWebSocketFrameHandler(channelGroup, ttyConnectionFactory)); ctx.fireChannelActive(); diff --git a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClient.java b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClient.java index 89eaa22ae..0e671a259 100644 --- a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClient.java +++ b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClient.java @@ -8,6 +8,8 @@ import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.taobao.arthas.common.ArthasConstants; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -91,7 +93,7 @@ public class ForwardClient { if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } - p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler, + p.addLast(new HttpClientCodec(), new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH), websocketClientHandler, forwardClientSocketClientHandler); } }); diff --git a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java index ca488a2ce..b1267fd2a 100644 --- a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java +++ b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ForwardClientSocketClientHandler.java @@ -84,7 +84,7 @@ public class ForwardClientSocketClientHandler extends SimpleChannelInboundHandle @Override protected void initChannel(LocalChannel ch) { ChannelPipeline p = ch.pipeline(); - p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler, + p.addLast(new HttpClientCodec(), new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH), websocketClientHandler, localFrameHandler); } }); diff --git a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ProxyClient.java b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ProxyClient.java new file mode 100644 index 000000000..bfa499e11 --- /dev/null +++ b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/ProxyClient.java @@ -0,0 +1,157 @@ +package com.alibaba.arthas.tunnel.client; + +import java.io.UnsupportedEncodingException; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.arthas.tunnel.common.SimpleHttpResponse; +import com.taobao.arthas.common.ArthasConstants; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; + +/** + * + * @author hengyunabc 2020-10-22 + * + */ +public class ProxyClient { + private static final Logger logger = LoggerFactory.getLogger(ProxyClient.class); + + public SimpleHttpResponse query(String targetUrl) throws InterruptedException { + final Promise httpResponsePromise = GlobalEventExecutor.INSTANCE.newPromise(); + + final EventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("arthas-ProxyClient", true)); + ChannelFuture closeFuture = null; + try { + Bootstrap b = new Bootstrap(); + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); + b.group(group).channel(LocalChannel.class).handler(new ChannelInitializer() { + @Override + protected void initChannel(LocalChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpClientCodec(), new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH), + new HttpProxyClientHandler(httpResponsePromise)); + } + }); + + LocalAddress localAddress = new LocalAddress(ArthasConstants.NETTY_LOCAL_ADDRESS); + Channel localChannel = b.connect(localAddress).sync().channel(); + + // Prepare the HTTP request. + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, targetUrl, + Unpooled.EMPTY_BUFFER); + request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + + localChannel.writeAndFlush(request); + + closeFuture = localChannel.closeFuture(); + logger.info("proxy client connect to server success, targetUrl: " + targetUrl); + + return httpResponsePromise.get(5000, TimeUnit.MILLISECONDS); + } catch (Throwable e) { + logger.error("ProxyClient error, targetUrl: {}", targetUrl, e); + } finally { + if (closeFuture != null) { + closeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + group.shutdownGracefully(); + } + }); + } else { + group.shutdownGracefully(); + } + } + + SimpleHttpResponse httpResponse = new SimpleHttpResponse(); + try { + httpResponse.setContent(new String("error").getBytes("utf-8")); + } catch (UnsupportedEncodingException e) { + // ignore + } + return httpResponse; + } + + class HttpProxyClientHandler extends SimpleChannelInboundHandler { + + private Promise promise; + + private SimpleHttpResponse simpleHttpResponse = new SimpleHttpResponse(); + + public HttpProxyClientHandler(Promise promise) { + this.promise = promise; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; + + simpleHttpResponse.setStatus(response.status().code()); + if (!response.headers().isEmpty()) { + for (String name : response.headers().names()) { + for (String value : response.headers().getAll(name)) { + if (logger.isDebugEnabled()) { + logger.debug("header: {}, value: {}", name, value); + } + + simpleHttpResponse.addHeader(name, value); + } + } + } + } + if (msg instanceof HttpContent) { + HttpContent content = (HttpContent) msg; + + ByteBuf byteBuf = content.content(); + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + + simpleHttpResponse.setContent(bytes); + + promise.setSuccess(simpleHttpResponse); + + if (content instanceof LastHttpContent) { + ctx.close(); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("Proxy Client error", cause); + ctx.close(); + } + } +} diff --git a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClient.java b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClient.java index 9e917a35f..b5334357e 100644 --- a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClient.java +++ b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClient.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.arthas.tunnel.common.MethodConstants; import com.alibaba.arthas.tunnel.common.URIConstans; +import com.taobao.arthas.common.ArthasConstants; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; @@ -115,7 +116,7 @@ public class TunnelClient { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } - p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), websocketClientHandler, + p.addLast(new HttpClientCodec(), new HttpObjectAggregator(ArthasConstants.MAX_HTTP_CONTENT_LENGTH), websocketClientHandler, handler); } }); diff --git a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClientSocketClientHandler.java b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClientSocketClientHandler.java index 60c774a15..f9595ed89 100644 --- a/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClientSocketClientHandler.java +++ b/tunnel-client/src/main/java/com/alibaba/arthas/tunnel/client/TunnelClientSocketClientHandler.java @@ -10,16 +10,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.arthas.tunnel.common.MethodConstants; +import com.alibaba.arthas.tunnel.common.SimpleHttpResponse; import com.alibaba.arthas.tunnel.common.URIConstans; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Encoder; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.QueryStringEncoder; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.util.CharsetUtil; /** * @@ -86,6 +92,46 @@ public class TunnelClientSocketClientHandler extends SimpleChannelInboundHandler } } + if (MethodConstants.HTTP_PROXY.equals(method)) { + /** + *
+                 * 1. 从proxy请求里读取到目标的 targetUrl,和 requestId
+                 * 2. 然后通过 ProxyClient直接请求得到结果
+                 * 3. 把response结果转为 byte[],再转为base64,再统一组合的一个url,再用 TextWebSocketFrame 发回去
+                 * 
+ * + */ + ProxyClient proxyClient = new ProxyClient(); + List targetUrls = parameters.get(URIConstans.TARGET_URL); + + List requestIDs = parameters.get(URIConstans.PROXY_REQUEST_ID); + String id = null; + if (requestIDs != null && !requestIDs.isEmpty()) { + id = requestIDs.get(0); + } + if (id == null) { + logger.error("error, http proxy need {}", URIConstans.PROXY_REQUEST_ID); + return; + } + + if (targetUrls != null && !targetUrls.isEmpty()) { + String targetUrl = targetUrls.get(0); + SimpleHttpResponse simpleHttpResponse = proxyClient.query(targetUrl); + + ByteBuf byteBuf = Base64 + .encode(Unpooled.wrappedBuffer(SimpleHttpResponse.toBytes(simpleHttpResponse))); + String requestData = byteBuf.toString(CharsetUtil.UTF_8); + + QueryStringEncoder queryEncoder = new QueryStringEncoder(""); + queryEncoder.addParam(URIConstans.METHOD, MethodConstants.HTTP_PROXY); + queryEncoder.addParam(URIConstans.PROXY_REQUEST_ID, id); + queryEncoder.addParam(URIConstans.PROXY_RESPONSE_DATA, requestData); + + String url = queryEncoder.toString(); + ctx.writeAndFlush(new TextWebSocketFrame(url)); + } + } + } } diff --git a/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/MethodConstants.java b/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/MethodConstants.java index 26162d639..9c97636fc 100644 --- a/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/MethodConstants.java +++ b/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/MethodConstants.java @@ -49,4 +49,12 @@ public class MethodConstants { * */ public static final String OPEN_TUNNEL = "openTunnel"; + + /** + *
+     * tunnel server向 tunnel client请求 http中转,比如访问 http://localhost:3658/arthas-output/xxx.svg
+     * 
+ */ + public static final String HTTP_PROXY = "httpProxy"; + } diff --git a/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/SimpleHttpResponse.java b/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/SimpleHttpResponse.java new file mode 100644 index 000000000..bdaacc20c --- /dev/null +++ b/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/SimpleHttpResponse.java @@ -0,0 +1,89 @@ +package com.alibaba.arthas.tunnel.common; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * + * @author hengyunabc 2020-10-22 + * + */ +public class SimpleHttpResponse implements Serializable { + + private static final long serialVersionUID = 1L; + + private int status = 200; + + private Map headers = new HashMap(); + + private byte[] content; + + public void addHeader(String key, String value) { + headers.put(key, value); + } + + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + + public byte[] getContent() { + return content; + } + + public void setContent(byte[] content) { + this.content = content; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public static byte[] toBytes(SimpleHttpResponse response) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(bos); + out.writeObject(response); + out.flush(); + return bos.toByteArray(); + } finally { + try { + bos.close(); + } catch (IOException ex) { + // ignore close exception + } + } + } + + public static SimpleHttpResponse fromBytes(byte[] bytes) throws IOException, ClassNotFoundException { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInput in = null; + try { + in = new ObjectInputStream(bis); + return (SimpleHttpResponse) in.readObject(); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException ex) { + // ignore close exception + } + } + } +} diff --git a/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/URIConstans.java b/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/URIConstans.java index fd8c7a8ab..fd0c7f931 100644 --- a/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/URIConstans.java +++ b/tunnel-common/src/main/java/com/alibaba/arthas/tunnel/common/URIConstans.java @@ -22,4 +22,21 @@ public class URIConstans { * tunnel server用于区分不同 tunnel client的内部 id */ public static final String CLIENT_CONNECTION_ID = "clientConnectionId"; + + /** + * tunnel server向 tunnel client请求http代理时的目标 url + * + * @see com.alibaba.arthas.tunnel.common.MethodConstants#HTTP_PROXY + */ + public static final String TARGET_URL = "targetUrl"; + + /** + * 标识一次proxy请求,随机生成 + */ + public static final String PROXY_REQUEST_ID = "requestId"; + + /** + * proxy请求的返回值,base64编码 + */ + public static final String PROXY_RESPONSE_DATA = "responseData"; } diff --git a/tunnel-server/pom.xml b/tunnel-server/pom.xml index 65b5b8fe0..dcc398f04 100644 --- a/tunnel-server/pom.xml +++ b/tunnel-server/pom.xml @@ -32,6 +32,11 @@ + + com.taobao.arthas + arthas-common + ${project.version} + com.taobao.arthas arthas-tunnel-common diff --git a/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelServer.java b/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelServer.java index 69396a844..bff3bedab 100644 --- a/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelServer.java +++ b/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelServer.java @@ -9,6 +9,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.alibaba.arthas.tunnel.common.SimpleHttpResponse; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; @@ -20,6 +22,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.Promise; /** * @@ -36,6 +39,11 @@ public class TunnelServer { private Map agentInfoMap = new ConcurrentHashMap(); private Map clientConnectionInfoMap = new ConcurrentHashMap(); + + /** + * 记录 proxy request + */ + private Map> proxyRequestPromiseMap = new ConcurrentHashMap>(); private EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("arthas-TunnelServer-boss", true)); private EventLoopGroup workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("arthas-TunnelServer-worker", true)); @@ -94,7 +102,7 @@ public class TunnelServer { public AgentInfo removeAgent(String id) { return agentInfoMap.remove(id); } - + public Optional findClientConnection(String id) { return Optional.ofNullable(this.clientConnectionInfoMap.get(id)); } @@ -106,6 +114,27 @@ public class TunnelServer { public ClientConnectionInfo removeClientConnectionInfo(String id) { return this.clientConnectionInfoMap.remove(id); } + + public void addProxyRequestPromise(String requestId, Promise promise) { + this.proxyRequestPromiseMap.put(requestId, promise); + // 把过期的proxy 请求删掉 + workerGroup.schedule(new Runnable() { + + @Override + public void run() { + removeProxyRequestPromise(requestId); + } + + }, 60, TimeUnit.SECONDS); + } + + public void removeProxyRequestPromise(String requestId) { + this.proxyRequestPromiseMap.remove(requestId); + } + + public Promise findProxyRequestPromise(String requestId) { + return this.proxyRequestPromiseMap.get(requestId); + } public boolean isSsl() { return ssl; diff --git a/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketFrameHandler.java b/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketFrameHandler.java index 1f129ff4a..74a2a79a7 100644 --- a/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketFrameHandler.java +++ b/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketFrameHandler.java @@ -5,6 +5,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.net.URLDecoder; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -12,12 +13,14 @@ import java.util.concurrent.TimeUnit; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.tomcat.util.codec.binary.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.MultiValueMap; import org.springframework.web.util.UriComponentsBuilder; import com.alibaba.arthas.tunnel.common.MethodConstants; +import com.alibaba.arthas.tunnel.common.SimpleHttpResponse; import com.alibaba.arthas.tunnel.common.URIConstans; import io.netty.channel.Channel; @@ -75,7 +78,41 @@ public class TunnelSocketFrameHandler extends SimpleChannelInboundHandler parameters = UriComponentsBuilder.fromUriString(text).build() + .getQueryParams(); + + String method = parameters.getFirst(URIConstans.METHOD); + + /** + *
+             * 1. 之前http proxy请求已发送到 tunnel cleint,这里接收到 tunnel client的结果,并解析出SimpleHttpResponse
+             * 2. 需要据 URIConstans.PROXY_REQUEST_ID 取出当时的 Promise,再设置SimpleHttpResponse进去
+             * 
+ */ + if (MethodConstants.HTTP_PROXY.equals(method)) { + String requestId = URLDecoder.decode(parameters.getFirst(URIConstans.PROXY_REQUEST_ID), "utf-8"); + + if (requestId == null) { + logger.error("error, need {}, text: {}", URIConstans.PROXY_REQUEST_ID, text); + return; + } + logger.info("received http proxy response, requestId: {}", requestId); + + Promise promise = tunnelServer.findProxyRequestPromise(requestId); + + String data = URLDecoder.decode(parameters.getFirst(URIConstans.PROXY_RESPONSE_DATA), "utf-8"); + byte[] bytes = Base64.decodeBase64(data); + + SimpleHttpResponse simpleHttpResponse = SimpleHttpResponse.fromBytes(bytes); + promise.setSuccess(simpleHttpResponse); + } + } } private void connectArthas(ChannelHandlerContext tunnelSocketCtx, MultiValueMap parameters) diff --git a/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketServerInitializer.java b/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketServerInitializer.java index 5c07f9af2..de20c094e 100644 --- a/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketServerInitializer.java +++ b/tunnel-server/src/main/java/com/alibaba/arthas/tunnel/server/TunnelSocketServerInitializer.java @@ -1,5 +1,7 @@ package com.alibaba.arthas.tunnel.server; +import com.taobao.arthas.common.ArthasConstants; + import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; @@ -36,7 +38,7 @@ public class TunnelSocketServerInitializer extends ChannelInitializer execute(@PathVariable(name = "agentId", required = true) String agentId, + HttpServletRequest request) throws InterruptedException, ExecutionException, TimeoutException { + + String fullPath = (String) request.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE); + String targetUrl = fullPath.substring("/proxy/".length() + agentId.length()); + + logger.info("http proxy, agentId: {}, targetUrl: {}", agentId, targetUrl); + + Optional findAgent = tunnelServer.findAgent(agentId); + + if (findAgent.isPresent()) { + String requestId = RandomStringUtils.random(20, true, true).toUpperCase(); + + ChannelHandlerContext agentCtx = findAgent.get().getChannelHandlerContext(); + + Promise httpResponsePromise = GlobalEventExecutor.INSTANCE.newPromise(); + + tunnelServer.addProxyRequestPromise(requestId, httpResponsePromise); + + URI uri = UriComponentsBuilder.newInstance().scheme(URIConstans.RESPONSE).path("/") + .queryParam(URIConstans.METHOD, MethodConstants.HTTP_PROXY).queryParam(URIConstans.ID, agentId) + .queryParam(URIConstans.TARGET_URL, targetUrl).queryParam(URIConstans.PROXY_REQUEST_ID, requestId) + .build().toUri(); + + agentCtx.channel().writeAndFlush(new TextWebSocketFrame(uri.toString())); + logger.info("waitting for arthas agent http proxy, agentId: {}, targetUrl: {}", agentId, targetUrl); + + SimpleHttpResponse simpleHttpResponse = httpResponsePromise.get(15, TimeUnit.SECONDS); + + BodyBuilder bodyBuilder = ResponseEntity.status(simpleHttpResponse.getStatus()); + for (Entry entry : simpleHttpResponse.getHeaders().entrySet()) { + bodyBuilder.header(entry.getKey(), entry.getValue()); + } + ResponseEntity responseEntity = bodyBuilder.body(simpleHttpResponse.getContent()); + return responseEntity; + } else { + logger.error("can not find agent by agentId: {}", agentId); + } + + return ResponseEntity.notFound().build(); + } +} diff --git a/tunnel-server/src/test/java/com/alibaba/arthas/tunnel/server/URITest.java b/tunnel-server/src/test/java/com/alibaba/arthas/tunnel/server/URITest.java new file mode 100644 index 000000000..92607597a --- /dev/null +++ b/tunnel-server/src/test/java/com/alibaba/arthas/tunnel/server/URITest.java @@ -0,0 +1,67 @@ +package com.alibaba.arthas.tunnel.server; + +import java.net.URI; +import java.net.URISyntaxException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.springframework.web.util.UriComponentsBuilder; + +import com.alibaba.arthas.tunnel.common.MethodConstants; +import com.alibaba.arthas.tunnel.common.URIConstans; + +/** + * + * @author hengyunabc 2020-10-22 + * + */ +public class URITest { + @Test + public void test() throws URISyntaxException { + String id = "xxx"; + URI responseUri = new URI("response", null, "/", "method=" + MethodConstants.AGENT_REGISTER + "&id=" + id, + null); + + String string = responseUri.toString(); + + String uriString = UriComponentsBuilder.newInstance().scheme("response").path("/") + .queryParam("method", MethodConstants.AGENT_REGISTER).queryParam("id", id).build().toUriString(); + + Assertions.assertThat(string).isEqualTo(uriString).isEqualTo("response:/?method=agentRegister&id=xxx"); + } + + @Test + public void testEncode() throws URISyntaxException { + String id = "xxx/%ff#ff"; + URI responseUri = new URI("response", null, "/", "method=" + MethodConstants.AGENT_REGISTER + "&id=" + id, + null); + + String string = responseUri.toString(); + + String uriString = UriComponentsBuilder.newInstance().scheme(URIConstans.RESPONSE).path("/") + .queryParam(URIConstans.METHOD, MethodConstants.AGENT_REGISTER).queryParam(URIConstans.ID, id).build() + .encode().toUriString(); + + Assertions.assertThat(string).isEqualTo(uriString) + .isEqualTo("response:/?method=agentRegister&id=xxx/%25ff%23ff"); + } + + @Test + public void test3() throws URISyntaxException { + + String agentId = "ffff"; + String clientConnectionId = "ccccc"; + URI uri = new URI("response", null, "/", "method=" + MethodConstants.START_TUNNEL + "&id=" + agentId + + "&clientConnectionId=" + clientConnectionId, null); + + String string = uri.toString(); + + String uriString = UriComponentsBuilder.newInstance().scheme(URIConstans.RESPONSE).path("/") + .queryParam(URIConstans.METHOD, MethodConstants.START_TUNNEL).queryParam(URIConstans.ID, agentId) + .queryParam(URIConstans.CLIENT_CONNECTION_ID, clientConnectionId).build().toUriString(); + + System.err.println(string); + Assertions.assertThat(string).isEqualTo(uriString) + .isEqualTo("response:/?method=startTunnel&id=ffff&clientConnectionId=ccccc"); + } +}