support http and telnet in one port. #316

pull/911/head
hengyunabc 5 years ago
parent 4c4fe5fc7e
commit 7ccbf12bdb

@ -10,6 +10,7 @@ import com.taobao.arthas.core.shell.handlers.BindHandler;
import com.taobao.arthas.core.shell.impl.ShellServerImpl;
import com.taobao.arthas.core.shell.term.impl.HttpTermServer;
import com.taobao.arthas.core.shell.term.impl.TelnetTermServer;
import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer;
import com.taobao.arthas.core.util.ArthasBanner;
import com.taobao.arthas.core.util.Constants;
import com.taobao.arthas.core.util.LogUtil;
@ -127,7 +128,7 @@ public class ArthasBootstrap {
resolvers.add(builtinCommands);
// TODO: discover user provided command resolver
if (configure.getTelnetPort() > 0) {
shellServer.registerTermServer(new TelnetTermServer(configure.getIp(), configure.getTelnetPort(),
shellServer.registerTermServer(new HttpTelnetTermServer(configure.getIp(), configure.getTelnetPort(),
options.getConnectionTimeout()));
} else {
logger.info("telnet port is {}, skip bind telnet server.", configure.getTelnetPort());

@ -0,0 +1,87 @@
package com.taobao.arthas.core.shell.term.impl.httptelnet;
import java.util.concurrent.TimeUnit;
import com.taobao.arthas.core.shell.future.Future;
import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.term.Term;
import com.taobao.arthas.core.shell.term.TermServer;
import com.taobao.arthas.core.shell.term.impl.Helper;
import com.taobao.arthas.core.shell.term.impl.TermImpl;
import com.taobao.arthas.core.util.LogUtil;
import com.taobao.middleware.logger.Logger;
import io.termd.core.function.Consumer;
import io.termd.core.tty.TtyConnection;
/**
* both suport http/telnet
*
* @author hengyunabc 2019-11-04
*
*/
public class HttpTelnetTermServer extends TermServer {
private static Logger logger = LogUtil.getArthasLogger();
private Handler<Term> termHandler;
private NettyHttpTelnetTtyBootstrap bootstrap;
private String hostIp;
private int port;
private long connectionTimeout;
public HttpTelnetTermServer(String hostIp, int port, long connectionTimeout) {
this.hostIp = hostIp;
this.port = port;
this.connectionTimeout = connectionTimeout;
}
@Override
public TermServer termHandler(Handler<Term> handler) {
this.termHandler = handler;
return this;
}
@Override
public TermServer listen(Handler<Future<TermServer>> listenHandler) {
// TODO: charset and inputrc from options
bootstrap = new NettyHttpTelnetTtyBootstrap().setHost(hostIp).setPort(port);
try {
bootstrap.start(new Consumer<TtyConnection>() {
@Override
public void accept(final TtyConnection conn) {
termHandler.handle(new TermImpl(Helper.loadKeymap(), conn));
}
}).get(connectionTimeout, TimeUnit.MILLISECONDS);
listenHandler.handle(Future.<TermServer>succeededFuture());
} catch (Throwable t) {
logger.error(null, "Error listening to port " + port, t);
listenHandler.handle(Future.<TermServer>failedFuture(t));
}
return this;
}
@Override
public int actualPort() {
return bootstrap.getPort();
}
@Override
public void close() {
close(null);
}
@Override
public void close(Handler<Future<Void>> completionHandler) {
if (bootstrap != null) {
bootstrap.stop();
if (completionHandler != null) {
completionHandler.handle(Future.<Void>succeededFuture());
}
} else {
if (completionHandler != null) {
completionHandler.handle(Future.<Void>failedFuture("telnet term server not started"));
}
}
}
}

@ -0,0 +1,90 @@
package com.taobao.arthas.core.shell.term.impl.httptelnet;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.termd.core.function.Consumer;
import io.termd.core.function.Supplier;
import io.termd.core.telnet.TelnetBootstrap;
import io.termd.core.telnet.TelnetHandler;
import io.termd.core.tty.TtyConnection;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author hengyunabc 2019-11-05
*/
public class NettyHttpTelnetBootstrap extends TelnetBootstrap {
private EventLoopGroup group;
private ChannelGroup channelGroup;
public NettyHttpTelnetBootstrap() {
this.group = new NioEventLoopGroup();
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
}
public NettyHttpTelnetBootstrap setHost(String host) {
return (NettyHttpTelnetBootstrap) super.setHost(host);
}
public NettyHttpTelnetBootstrap setPort(int port) {
return (NettyHttpTelnetBootstrap) super.setPort(port);
}
@Override
public void start(Supplier<TelnetHandler> factory, Consumer<Throwable> doneHandler) {
// ignore, never invoke
}
public void start(final Supplier<TelnetHandler> handlerFactory, final Consumer<TtyConnection> factory,
final Consumer<Throwable> doneHandler) {
ServerBootstrap boostrap = new ServerBootstrap();
boostrap.group(group).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolDetectHandler(channelGroup, handlerFactory, factory));
}
});
boostrap.bind(getHost(), getPort()).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
doneHandler.accept(null);
} else {
doneHandler.accept(future.cause());
}
}
});
}
@Override
public void stop(final Consumer<Throwable> doneHandler) {
GenericFutureListener<Future<Object>> adapter = new GenericFutureListener<Future<Object>>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
try {
doneHandler.accept(future.cause());
} finally {
group.shutdownGracefully();
}
}
};
channelGroup.close().addListener(adapter);
}
}

@ -0,0 +1,110 @@
package com.taobao.arthas.core.shell.term.impl.httptelnet;
import java.nio.charset.Charset;
import io.termd.core.function.Consumer;
import io.termd.core.function.Supplier;
import io.termd.core.telnet.TelnetHandler;
import io.termd.core.telnet.TelnetTtyConnection;
import io.termd.core.tty.TtyConnection;
import io.termd.core.util.CompletableFuture;
import io.termd.core.util.Helper;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author hengyunabc 2019-11-05
*/
public class NettyHttpTelnetTtyBootstrap {
private final NettyHttpTelnetBootstrap httpTelnetTtyBootstrap;
private boolean outBinary;
private boolean inBinary;
private Charset charset = Charset.forName("UTF-8");
public NettyHttpTelnetTtyBootstrap() {
this.httpTelnetTtyBootstrap = new NettyHttpTelnetBootstrap();
}
public String getHost() {
return httpTelnetTtyBootstrap.getHost();
}
public NettyHttpTelnetTtyBootstrap setHost(String host) {
httpTelnetTtyBootstrap.setHost(host);
return this;
}
public int getPort() {
return httpTelnetTtyBootstrap.getPort();
}
public NettyHttpTelnetTtyBootstrap setPort(int port) {
httpTelnetTtyBootstrap.setPort(port);
return this;
}
public boolean isOutBinary() {
return outBinary;
}
/**
* Enable or disable the TELNET BINARY option on output.
*
* @param outBinary
* true to require the client to receive binary
* @return this object
*/
public NettyHttpTelnetTtyBootstrap setOutBinary(boolean outBinary) {
this.outBinary = outBinary;
return this;
}
public boolean isInBinary() {
return inBinary;
}
/**
* Enable or disable the TELNET BINARY option on input.
*
* @param inBinary
* true to require the client to emit binary
* @return this object
*/
public NettyHttpTelnetTtyBootstrap setInBinary(boolean inBinary) {
this.inBinary = inBinary;
return this;
}
public Charset getCharset() {
return charset;
}
public void setCharset(Charset charset) {
this.charset = charset;
}
public CompletableFuture<?> start(Consumer<TtyConnection> factory) {
CompletableFuture<?> fut = new CompletableFuture();
start(factory, Helper.startedHandler(fut));
return fut;
}
public CompletableFuture<?> stop() {
CompletableFuture<?> fut = new CompletableFuture();
stop(Helper.stoppedHandler(fut));
return fut;
}
public void start(final Consumer<TtyConnection> factory, Consumer<Throwable> doneHandler) {
httpTelnetTtyBootstrap.start(new Supplier<TelnetHandler>() {
@Override
public TelnetHandler get() {
return new TelnetTtyConnection(inBinary, outBinary, charset, factory);
}
}, factory, doneHandler);
}
public void stop(Consumer<Throwable> doneHandler) {
httpTelnetTtyBootstrap.stop(doneHandler);
}
}

@ -0,0 +1,87 @@
package com.taobao.arthas.core.shell.term.impl.httptelnet;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.ScheduledFuture;
import io.termd.core.function.Consumer;
import io.termd.core.function.Supplier;
import io.termd.core.http.netty.HttpRequestHandler;
import io.termd.core.http.netty.TtyWebSocketFrameHandler;
import io.termd.core.telnet.TelnetHandler;
import io.termd.core.telnet.netty.TelnetChannelHandler;
import io.termd.core.tty.TtyConnection;
/**
*
* @author hengyunabc 2019-11-04
*
*/
public class ProtocolDetectHandler extends ChannelInboundHandlerAdapter {
private ChannelGroup channelGroup;
private Supplier<TelnetHandler> handlerFactory;
private Consumer<TtyConnection> ttyConnectionFactory;
public ProtocolDetectHandler(ChannelGroup channelGroup, final Supplier<TelnetHandler> handlerFactory,
Consumer<TtyConnection> ttyConnectionFactory) {
this.channelGroup = channelGroup;
this.handlerFactory = handlerFactory;
this.ttyConnectionFactory = ttyConnectionFactory;
}
private ScheduledFuture<?> detectTelnetFuture;
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
detectTelnetFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
channelGroup.add(ctx.channel());
TelnetChannelHandler handler = new TelnetChannelHandler(handlerFactory);
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addLast(handler);
ctx.fireChannelActive();
pipeline.remove(ProtocolDetectHandler.this);
}
}, 500, TimeUnit.MILLISECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
if (in.readableBytes() < 3) {
return;
}
detectTelnetFuture.cancel(false);
byte[] bytes = new byte[4];
in.getBytes(0, bytes);
String httpHeader = new String(bytes);
ChannelPipeline pipeline = ctx.pipeline();
if (!"GET".equalsIgnoreCase(httpHeader)) { // telnet
channelGroup.add(ctx.channel());
TelnetChannelHandler handler = new TelnetChannelHandler(handlerFactory);
pipeline.addLast(handler);
ctx.fireChannelActive(); // trigger TelnetChannelHandler init
} else {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TtyWebSocketFrameHandler(channelGroup, ttyConnectionFactory));
}
ctx.fireChannelRead(msg);
pipeline.remove(this);
}
}
Loading…
Cancel
Save