diff --git a/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java b/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java index 77d26452a..cd34834c8 100644 --- a/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java +++ b/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java @@ -10,6 +10,7 @@ import com.taobao.arthas.core.shell.handlers.BindHandler; import com.taobao.arthas.core.shell.impl.ShellServerImpl; import com.taobao.arthas.core.shell.term.impl.HttpTermServer; import com.taobao.arthas.core.shell.term.impl.TelnetTermServer; +import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer; import com.taobao.arthas.core.util.ArthasBanner; import com.taobao.arthas.core.util.Constants; import com.taobao.arthas.core.util.LogUtil; @@ -127,7 +128,7 @@ public class ArthasBootstrap { resolvers.add(builtinCommands); // TODO: discover user provided command resolver if (configure.getTelnetPort() > 0) { - shellServer.registerTermServer(new TelnetTermServer(configure.getIp(), configure.getTelnetPort(), + shellServer.registerTermServer(new HttpTelnetTermServer(configure.getIp(), configure.getTelnetPort(), options.getConnectionTimeout())); } else { logger.info("telnet port is {}, skip bind telnet server.", configure.getTelnetPort()); diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/HttpTelnetTermServer.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/HttpTelnetTermServer.java new file mode 100644 index 000000000..7d4974f46 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/HttpTelnetTermServer.java @@ -0,0 +1,87 @@ +package com.taobao.arthas.core.shell.term.impl.httptelnet; + +import java.util.concurrent.TimeUnit; + +import com.taobao.arthas.core.shell.future.Future; +import com.taobao.arthas.core.shell.handlers.Handler; +import com.taobao.arthas.core.shell.term.Term; +import com.taobao.arthas.core.shell.term.TermServer; +import com.taobao.arthas.core.shell.term.impl.Helper; +import com.taobao.arthas.core.shell.term.impl.TermImpl; +import com.taobao.arthas.core.util.LogUtil; +import com.taobao.middleware.logger.Logger; + +import io.termd.core.function.Consumer; +import io.termd.core.tty.TtyConnection; + +/** + * both suport http/telnet + * + * @author hengyunabc 2019-11-04 + * + */ +public class HttpTelnetTermServer extends TermServer { + + private static Logger logger = LogUtil.getArthasLogger(); + + private Handler termHandler; + private NettyHttpTelnetTtyBootstrap bootstrap; + private String hostIp; + private int port; + private long connectionTimeout; + + public HttpTelnetTermServer(String hostIp, int port, long connectionTimeout) { + this.hostIp = hostIp; + this.port = port; + this.connectionTimeout = connectionTimeout; + } + + @Override + public TermServer termHandler(Handler handler) { + this.termHandler = handler; + return this; + } + + @Override + public TermServer listen(Handler> listenHandler) { + // TODO: charset and inputrc from options + bootstrap = new NettyHttpTelnetTtyBootstrap().setHost(hostIp).setPort(port); + try { + bootstrap.start(new Consumer() { + @Override + public void accept(final TtyConnection conn) { + termHandler.handle(new TermImpl(Helper.loadKeymap(), conn)); + } + }).get(connectionTimeout, TimeUnit.MILLISECONDS); + listenHandler.handle(Future.succeededFuture()); + } catch (Throwable t) { + logger.error(null, "Error listening to port " + port, t); + listenHandler.handle(Future.failedFuture(t)); + } + return this; + } + + @Override + public int actualPort() { + return bootstrap.getPort(); + } + + @Override + public void close() { + close(null); + } + + @Override + public void close(Handler> completionHandler) { + if (bootstrap != null) { + bootstrap.stop(); + if (completionHandler != null) { + completionHandler.handle(Future.succeededFuture()); + } + } else { + if (completionHandler != null) { + completionHandler.handle(Future.failedFuture("telnet term server not started")); + } + } + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/NettyHttpTelnetBootstrap.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/NettyHttpTelnetBootstrap.java new file mode 100644 index 000000000..b289582f5 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/NettyHttpTelnetBootstrap.java @@ -0,0 +1,90 @@ +package com.taobao.arthas.core.shell.term.impl.httptelnet; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.termd.core.function.Consumer; +import io.termd.core.function.Supplier; +import io.termd.core.telnet.TelnetBootstrap; +import io.termd.core.telnet.TelnetHandler; +import io.termd.core.tty.TtyConnection; + +/** + * @author Julien Viet + * @author hengyunabc 2019-11-05 + */ +public class NettyHttpTelnetBootstrap extends TelnetBootstrap { + + private EventLoopGroup group; + private ChannelGroup channelGroup; + + public NettyHttpTelnetBootstrap() { + this.group = new NioEventLoopGroup(); + this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + } + + public NettyHttpTelnetBootstrap setHost(String host) { + return (NettyHttpTelnetBootstrap) super.setHost(host); + } + + public NettyHttpTelnetBootstrap setPort(int port) { + return (NettyHttpTelnetBootstrap) super.setPort(port); + } + + @Override + public void start(Supplier factory, Consumer doneHandler) { + // ignore, never invoke + + } + + public void start(final Supplier handlerFactory, final Consumer factory, + final Consumer doneHandler) { + ServerBootstrap boostrap = new ServerBootstrap(); + boostrap.group(group).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new ProtocolDetectHandler(channelGroup, handlerFactory, factory)); + } + }); + + boostrap.bind(getHost(), getPort()).addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + doneHandler.accept(null); + } else { + doneHandler.accept(future.cause()); + } + } + }); + } + + @Override + public void stop(final Consumer doneHandler) { + GenericFutureListener> adapter = new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + try { + doneHandler.accept(future.cause()); + } finally { + group.shutdownGracefully(); + } + } + }; + channelGroup.close().addListener(adapter); + } + +} \ No newline at end of file diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/NettyHttpTelnetTtyBootstrap.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/NettyHttpTelnetTtyBootstrap.java new file mode 100644 index 000000000..100c003f4 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/NettyHttpTelnetTtyBootstrap.java @@ -0,0 +1,110 @@ +package com.taobao.arthas.core.shell.term.impl.httptelnet; + +import java.nio.charset.Charset; + +import io.termd.core.function.Consumer; +import io.termd.core.function.Supplier; +import io.termd.core.telnet.TelnetHandler; +import io.termd.core.telnet.TelnetTtyConnection; +import io.termd.core.tty.TtyConnection; +import io.termd.core.util.CompletableFuture; +import io.termd.core.util.Helper; + +/** + * @author Julien Viet + * @author hengyunabc 2019-11-05 + */ +public class NettyHttpTelnetTtyBootstrap { + + private final NettyHttpTelnetBootstrap httpTelnetTtyBootstrap; + private boolean outBinary; + private boolean inBinary; + private Charset charset = Charset.forName("UTF-8"); + + public NettyHttpTelnetTtyBootstrap() { + this.httpTelnetTtyBootstrap = new NettyHttpTelnetBootstrap(); + } + + public String getHost() { + return httpTelnetTtyBootstrap.getHost(); + } + + public NettyHttpTelnetTtyBootstrap setHost(String host) { + httpTelnetTtyBootstrap.setHost(host); + return this; + } + + public int getPort() { + return httpTelnetTtyBootstrap.getPort(); + } + + public NettyHttpTelnetTtyBootstrap setPort(int port) { + httpTelnetTtyBootstrap.setPort(port); + return this; + } + + public boolean isOutBinary() { + return outBinary; + } + + /** + * Enable or disable the TELNET BINARY option on output. + * + * @param outBinary + * true to require the client to receive binary + * @return this object + */ + public NettyHttpTelnetTtyBootstrap setOutBinary(boolean outBinary) { + this.outBinary = outBinary; + return this; + } + + public boolean isInBinary() { + return inBinary; + } + + /** + * Enable or disable the TELNET BINARY option on input. + * + * @param inBinary + * true to require the client to emit binary + * @return this object + */ + public NettyHttpTelnetTtyBootstrap setInBinary(boolean inBinary) { + this.inBinary = inBinary; + return this; + } + + public Charset getCharset() { + return charset; + } + + public void setCharset(Charset charset) { + this.charset = charset; + } + + public CompletableFuture start(Consumer factory) { + CompletableFuture fut = new CompletableFuture(); + start(factory, Helper.startedHandler(fut)); + return fut; + } + + public CompletableFuture stop() { + CompletableFuture fut = new CompletableFuture(); + stop(Helper.stoppedHandler(fut)); + return fut; + } + + public void start(final Consumer factory, Consumer doneHandler) { + httpTelnetTtyBootstrap.start(new Supplier() { + @Override + public TelnetHandler get() { + return new TelnetTtyConnection(inBinary, outBinary, charset, factory); + } + }, factory, doneHandler); + } + + public void stop(Consumer doneHandler) { + httpTelnetTtyBootstrap.stop(doneHandler); + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java new file mode 100644 index 000000000..6562cfaac --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/httptelnet/ProtocolDetectHandler.java @@ -0,0 +1,87 @@ +package com.taobao.arthas.core.shell.term.impl.httptelnet; + +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.group.ChannelGroup; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.concurrent.ScheduledFuture; +import io.termd.core.function.Consumer; +import io.termd.core.function.Supplier; +import io.termd.core.http.netty.HttpRequestHandler; +import io.termd.core.http.netty.TtyWebSocketFrameHandler; +import io.termd.core.telnet.TelnetHandler; +import io.termd.core.telnet.netty.TelnetChannelHandler; +import io.termd.core.tty.TtyConnection; + +/** + * + * @author hengyunabc 2019-11-04 + * + */ +public class ProtocolDetectHandler extends ChannelInboundHandlerAdapter { + private ChannelGroup channelGroup; + private Supplier handlerFactory; + private Consumer ttyConnectionFactory; + + public ProtocolDetectHandler(ChannelGroup channelGroup, final Supplier handlerFactory, + Consumer ttyConnectionFactory) { + this.channelGroup = channelGroup; + this.handlerFactory = handlerFactory; + this.ttyConnectionFactory = ttyConnectionFactory; + } + + private ScheduledFuture detectTelnetFuture; + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + detectTelnetFuture = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + channelGroup.add(ctx.channel()); + TelnetChannelHandler handler = new TelnetChannelHandler(handlerFactory); + ChannelPipeline pipeline = ctx.pipeline(); + pipeline.addLast(handler); + ctx.fireChannelActive(); + pipeline.remove(ProtocolDetectHandler.this); + } + }, 500, TimeUnit.MILLISECONDS); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf in = (ByteBuf) msg; + if (in.readableBytes() < 3) { + return; + } + + detectTelnetFuture.cancel(false); + + byte[] bytes = new byte[4]; + in.getBytes(0, bytes); + String httpHeader = new String(bytes); + + ChannelPipeline pipeline = ctx.pipeline(); + if (!"GET".equalsIgnoreCase(httpHeader)) { // telnet + channelGroup.add(ctx.channel()); + TelnetChannelHandler handler = new TelnetChannelHandler(handlerFactory); + pipeline.addLast(handler); + ctx.fireChannelActive(); // trigger TelnetChannelHandler init + } else { + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new ChunkedWriteHandler()); + pipeline.addLast(new HttpObjectAggregator(64 * 1024)); + pipeline.addLast(new HttpRequestHandler("/ws")); + pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); + pipeline.addLast(new TtyWebSocketFrameHandler(channelGroup, ttyConnectionFactory)); + } + ctx.fireChannelRead(msg); + pipeline.remove(this); + } +}