simplified gRPC impl using netty (#2914)

pull/2939/head
FengYe 3 months ago committed by GitHub
parent 2858f6ee93
commit 8200afb3cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,3 @@
# Arthas Grpc
这个模块提供了一个轻量级的 Grpc 实现,目前任在开发中

@ -0,0 +1,170 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arthas-all</artifactId>
<groupId>com.taobao.arthas</groupId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>arthas-grpc-server</artifactId>
<name>arthas-grpc-server</name>
<url>https://github.com/alibaba/arthas</url>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.46.0</grpc.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-codec-http2 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>4.1.72.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>com.taobao.arthas</groupId>
<artifactId>arthas-common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- 测试用 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.arthas</groupId>
<artifactId>arthas-repackage-logger</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protoSourceRoot>${basedir}/src/main/proto</protoSourceRoot>
<protocArtifact>com.google.protobuf:protoc:3.11.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.28.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
</build>
<profiles>
<profile>
<id>mac</id>
<activation>
<os>
<family>mac</family>
</os>
</activation>
<properties>
<os.detected.classifier>osx-x86_64</os.detected.classifier>
</properties>
</profile>
</profiles>
</project>

@ -0,0 +1,13 @@
package com.taobao.arthas.grpc.server;
/**
* @author: FengYe
* @date: 2024/10/13 02:40
* @description: ArthasGrpcServerBootstrap
*/
public class ArthasGrpcBootstrap {
public static void main(String[] args) {
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9090, null);
arthasGrpcServer.start();
}
}

@ -0,0 +1,73 @@
package com.taobao.arthas.grpc.server;
import com.alibaba.arthas.deps.ch.qos.logback.classic.Level;
import com.alibaba.arthas.deps.ch.qos.logback.classic.LoggerContext;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.Http2Handler;
import com.taobao.arthas.grpc.server.handler.executor.GrpcExecutorFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.lang.invoke.MethodHandles;
/**
* @author: FengYe
* @date: 2024/7/3 12:30
* @description: ArthasGrpcServer
*/
public class ArthasGrpcServer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private int port = 9090;
private String grpcServicePackageName;
public ArthasGrpcServer(int port, String grpcServicePackageName) {
this.port = port;
this.grpcServicePackageName = grpcServicePackageName;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(10);
GrpcDispatcher grpcDispatcher = new GrpcDispatcher();
grpcDispatcher.loadGrpcService(grpcServicePackageName);
GrpcExecutorFactory grpcExecutorFactory = new GrpcExecutorFactory();
grpcExecutorFactory.loadExecutor(grpcDispatcher);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(Http2FrameCodecBuilder.forServer().build());
ch.pipeline().addLast(new Http2Handler(grpcDispatcher, grpcExecutorFactory));
}
});
Channel channel = b.bind(port).sync().channel();
logger.info("ArthasGrpcServer start successfully on port: {}", port);
channel.closeFuture().sync();
} catch (InterruptedException e) {
logger.error("ArthasGrpcServer start error", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

@ -0,0 +1,198 @@
package com.taobao.arthas.grpc.server.handler;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcService;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import com.taobao.arthas.grpc.server.utils.ReflectUtil;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* @author: FengYe
* @date: 2024/9/6 01:12
* @description: GrpcDelegrate
*/
public class GrpcDispatcher {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
public static final String DEFAULT_GRPC_SERVICE_PACKAGE_NAME = "com.taobao.arthas.grpc.server.service.impl";
public static Map<String, MethodHandle> grpcInvokeMap = new HashMap<>();
// public static Map<String, StreamObserver> clientStreamInvokeMap = new HashMap<>();
public static Map<String, MethodHandle> requestParseFromMap = new HashMap<>();
public static Map<String, MethodHandle> requestToByteArrayMap = new HashMap<>();
public static Map<String, MethodHandle> responseParseFromMap = new HashMap<>();
public static Map<String, MethodHandle> responseToByteArrayMap = new HashMap<>();
public static Map<String, GrpcInvokeTypeEnum> grpcInvokeTypeMap = new HashMap<>();
public void loadGrpcService(String grpcServicePackageName) {
List<Class<?>> classes = ReflectUtil.findClasses(Optional.ofNullable(grpcServicePackageName).orElse(DEFAULT_GRPC_SERVICE_PACKAGE_NAME));
for (Class<?> clazz : classes) {
if (clazz.isAnnotationPresent(GrpcService.class)) {
try {
// 处理 service
GrpcService grpcService = clazz.getAnnotation(GrpcService.class);
Object instance = clazz.getDeclaredConstructor().newInstance();
// 处理 method
MethodHandles.Lookup lookup = MethodHandles.lookup();
Method[] declaredMethods = clazz.getDeclaredMethods();
for (Method method : declaredMethods) {
if (method.isAnnotationPresent(GrpcMethod.class)) {
GrpcMethod grpcMethod = method.getAnnotation(GrpcMethod.class);
MethodHandle grpcInvoke = lookup.unreflect(method);
String grpcMethodKey = generateGrpcMethodKey(grpcService.value(), grpcMethod.value());
grpcInvokeTypeMap.put(grpcMethodKey, grpcMethod.grpcType());
grpcInvokeMap.put(grpcMethodKey, grpcInvoke.bindTo(instance));
Class<?> requestClass = null;
Class<?> responseClass = null;
if (GrpcInvokeTypeEnum.UNARY.equals(grpcMethod.grpcType())) {
requestClass = grpcInvoke.type().parameterType(1);
responseClass = grpcInvoke.type().returnType();
} else if (GrpcInvokeTypeEnum.CLIENT_STREAM.equals(grpcMethod.grpcType()) || GrpcInvokeTypeEnum.BI_STREAM.equals(grpcMethod.grpcType())) {
responseClass = getInnerGenericClass(method.getGenericParameterTypes()[0]);
requestClass = getInnerGenericClass(method.getGenericReturnType());
} else if (GrpcInvokeTypeEnum.SERVER_STREAM.equals(grpcMethod.grpcType())) {
requestClass = getInnerGenericClass(method.getGenericParameterTypes()[0]);
responseClass = getInnerGenericClass(method.getGenericParameterTypes()[1]);
}
MethodHandle requestParseFrom = lookup.findStatic(requestClass, "parseFrom", MethodType.methodType(requestClass, byte[].class));
MethodHandle responseParseFrom = lookup.findStatic(responseClass, "parseFrom", MethodType.methodType(responseClass, byte[].class));
MethodHandle requestToByteArray = lookup.findVirtual(requestClass, "toByteArray", MethodType.methodType(byte[].class));
MethodHandle responseToByteArray = lookup.findVirtual(responseClass, "toByteArray", MethodType.methodType(byte[].class));
requestParseFromMap.put(grpcMethodKey, requestParseFrom);
responseParseFromMap.put(grpcMethodKey, responseParseFrom);
requestToByteArrayMap.put(grpcMethodKey, requestToByteArray);
responseToByteArrayMap.put(grpcMethodKey, responseToByteArray);
// switch (grpcMethod.grpcType()) {
// case UNARY:
// unaryInvokeMap.put(grpcMethodKey, grpcInvoke.bindTo(instance));
// return;
// case CLIENT_STREAM:
// Object invoke = grpcInvoke.bindTo(instance).invoke();
// if (!(invoke instanceof StreamObserver)) {
// throw new RuntimeException(grpcMethodKey + " return class is not StreamObserver!");
// }
// clientStreamInvokeMap.put(grpcMethodKey, (StreamObserver) invoke);
// return;
// case SERVER_STREAM:
// return;
// case BI_STREAM:
// return;
// }
}
}
} catch (Throwable e) {
logger.error("GrpcDispatcher loadGrpcService error.", e);
}
}
}
}
public GrpcResponse doUnaryExecute(String service, String method, byte[] arg) throws Throwable {
MethodHandle methodHandle = grpcInvokeMap.get(generateGrpcMethodKey(service, method));
MethodType type = grpcInvokeMap.get(generateGrpcMethodKey(service, method)).type();
Object req = requestParseFromMap.get(generateGrpcMethodKey(service, method)).invoke(arg);
Object execute = methodHandle.invoke(req);
GrpcResponse grpcResponse = new GrpcResponse();
grpcResponse.setClazz(type.returnType());
grpcResponse.setService(service);
grpcResponse.setMethod(method);
grpcResponse.writeResponseData(execute);
return grpcResponse;
}
public GrpcResponse unaryExecute(GrpcRequest request) throws Throwable {
MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey());
MethodType type = grpcInvokeMap.get(request.getGrpcMethodKey()).type();
Object req = requestParseFromMap.get(request.getGrpcMethodKey()).invoke(request.readData());
Object execute = methodHandle.invoke(req);
GrpcResponse grpcResponse = new GrpcResponse();
grpcResponse.setClazz(type.returnType());
grpcResponse.setService(request.getService());
grpcResponse.setMethod(request.getMethod());
grpcResponse.writeResponseData(execute);
return grpcResponse;
}
public StreamObserver<GrpcRequest> clientStreamExecute(GrpcRequest request, StreamObserver<GrpcResponse> responseObserver) throws Throwable {
MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey());
return (StreamObserver<GrpcRequest>) methodHandle.invoke(responseObserver);
}
public void serverStreamExecute(GrpcRequest request, StreamObserver<GrpcResponse> responseObserver) throws Throwable {
MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey());
Object req = requestParseFromMap.get(request.getGrpcMethodKey()).invoke(request.readData());
methodHandle.invoke(req, responseObserver);
}
public StreamObserver<GrpcRequest> biStreamExecute(GrpcRequest request, StreamObserver<GrpcResponse> responseObserver) throws Throwable {
MethodHandle methodHandle = grpcInvokeMap.get(request.getGrpcMethodKey());
return (StreamObserver<GrpcRequest>) methodHandle.invoke(responseObserver);
}
/**
* service method
*
* @param serviceName
* @param methodName
* @return
*/
public static Class<?> getRequestClass(String serviceName, String methodName) {
//protobuf 规范只能有单入参
return Optional.ofNullable(grpcInvokeMap.get(generateGrpcMethodKey(serviceName, methodName))).orElseThrow(() -> new RuntimeException("The specified grpc method does not exist")).type().parameterArray()[0];
}
public static String generateGrpcMethodKey(String serviceName, String methodName) {
return serviceName + "." + methodName;
}
public static void checkGrpcType(GrpcRequest request) {
request.setGrpcType(
Optional.ofNullable(grpcInvokeTypeMap.get(generateGrpcMethodKey(request.getService(), request.getMethod())))
.orElse(GrpcInvokeTypeEnum.UNARY)
);
request.setStreamFirstData(true);
}
public static Class<?> getInnerGenericClass(Type type) {
if (type instanceof Class<?>) {
return (Class<?>) type;
}
if (type instanceof ParameterizedType) {
ParameterizedType paramType = (ParameterizedType) type;
Type[] actualTypeArguments = paramType.getActualTypeArguments();
if (actualTypeArguments.length > 0) {
Type innerType = actualTypeArguments[0]; // 获取第一个实际类型参数
if (innerType instanceof ParameterizedType) {
return getInnerGenericClass(innerType); // 递归调用获取最内层类型
} else if (innerType instanceof Class) {
return (Class<?>) innerType; // 直接返回 Class 类型
}
}
}
return null; // 如果没有找到对应的类型
}
}

@ -0,0 +1,193 @@
package com.taobao.arthas.grpc.server.handler;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import com.taobao.arthas.grpc.server.utils.ByteUtil;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2Headers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;
/**
* @author: FengYe
* @date: 2024/9/4 23:07
* @description: GrpcRequest grpc
*/
public class GrpcRequest<T> {
/**
* streamId
*/
private Integer streamId;
/**
* service
*/
private String service;
/**
* method
*/
private String method;
/**
* grpc body body 5 byte boolean compressed - int length
*/
private ByteBuf byteData;
/**
*
*/
private int length;
/**
* class
*/
private Class<?> clazz;
/**
* grpc
*/
private boolean stream;
/**
* grpc data
*/
private boolean streamFirstData;
/**
* http2 headers
*/
private Http2Headers headers;
/**
* grpc
*/
private GrpcInvokeTypeEnum grpcType;
public GrpcRequest(Integer streamId, String path, String method) {
this.streamId = streamId;
this.service = path;
this.method = method;
this.byteData = ByteUtil.newByteBuf();
}
public void writeData(ByteBuf byteBuf) {
byte[] bytes = ByteUtil.getBytes(byteBuf);
if (bytes.length == 0) {
return;
}
byte[] decompressedData = decompressGzip(bytes);
if (decompressedData == null) {
return;
}
byteData.writeBytes(ByteUtil.newByteBuf(decompressedData));
}
/**
*
*
* @return
*/
public synchronized byte[] readData() {
if (byteData.readableBytes() == 0) {
return null;
}
boolean compressed = byteData.readBoolean();
int length = byteData.readInt();
byte[] bytes = new byte[length];
byteData.readBytes(bytes);
return bytes;
}
public void clearData() {
byteData.clear();
}
private byte[] decompressGzip(byte[] compressedData) {
boolean isGzip = (compressedData.length > 2 && (compressedData[0] & 0xff) == 0x1f && (compressedData[1] & 0xff) == 0x8b);
if (isGzip) {
try (InputStream byteStream = new ByteArrayInputStream(compressedData);
GZIPInputStream gzipStream = new GZIPInputStream(byteStream);
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = gzipStream.read(buffer)) != -1) {
out.write(buffer, 0, len);
}
return out.toByteArray();
} catch (IOException e) {
System.err.println("Failed to decompress GZIP data: " + e.getMessage());
// Optionally rethrow the exception or return an Optional<byte[]>
return null; // or throw new RuntimeException(e);
}
} else {
return compressedData;
}
}
public String getGrpcMethodKey() {
return service + "." + method;
}
public Integer getStreamId() {
return streamId;
}
public String getService() {
return service;
}
public String getMethod() {
return method;
}
public ByteBuf getByteData() {
return byteData;
}
public Class<?> getClazz() {
return clazz;
}
public void setClazz(Class<?> clazz) {
this.clazz = clazz;
}
public boolean isStream() {
return stream;
}
public void setStream(boolean stream) {
this.stream = stream;
}
public boolean isStreamFirstData() {
return streamFirstData;
}
public void setStreamFirstData(boolean streamFirstData) {
this.streamFirstData = streamFirstData;
}
public Http2Headers getHeaders() {
return headers;
}
public void setHeaders(Http2Headers headers) {
this.headers = headers;
}
public GrpcInvokeTypeEnum getGrpcType() {
return grpcType;
}
public void setGrpcType(GrpcInvokeTypeEnum grpcType) {
this.grpcType = grpcType;
}
}

@ -0,0 +1,114 @@
package com.taobao.arthas.grpc.server.handler;
import arthas.grpc.common.ArthasGrpc;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcService;
import com.taobao.arthas.grpc.server.utils.ByteUtil;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
* @author: FengYe
* @date: 2024/9/5 02:05
* @description: GrpcResponse
*/
public class GrpcResponse<T> {
private Map<String, String> headers;
/**
* service
*/
private String service;
/**
* method
*/
private String method;
/**
*
*/
private ByteBuf byteData;
/**
* class
*/
private Class<?> clazz;
{
headers = new HashMap<>();
headers.put("content-type", "application/grpc");
headers.put("grpc-encoding", "identity");
headers.put("grpc-accept-encoding", "identity,deflate,gzip");
}
public GrpcResponse() {
}
public GrpcResponse(Method method) {
this.service = method.getDeclaringClass().getAnnotation(GrpcService.class).value();
this.method = method.getAnnotation(GrpcMethod.class).value();
}
public Http2Headers getEndHeader() {
Http2Headers endHeader = new DefaultHttp2Headers().status("200");
headers.forEach(endHeader::set);
return endHeader;
}
public Http2Headers getEndStreamHeader() {
return new DefaultHttp2Headers().set("grpc-status", "0");
}
public static Http2Headers getDefaultEndStreamHeader() {
return new DefaultHttp2Headers().set("grpc-status", "0");
}
public ByteBuf getResponseData() {
return byteData;
}
public void writeResponseData(Object response) {
byte[] encode = null;
try {
if (ArthasGrpc.ErrorRes.class.equals(clazz)) {
encode = ((ArthasGrpc.ErrorRes) response).toByteArray();
} else {
encode = (byte[]) GrpcDispatcher.responseToByteArrayMap.get(GrpcDispatcher.generateGrpcMethodKey(service, method)).invoke(response);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
this.byteData = ByteUtil.newByteBuf();
this.byteData.writeBoolean(false);
this.byteData.writeInt(encode.length);
this.byteData.writeBytes(encode);
}
public void setClazz(Class<?> clazz) {
this.clazz = clazz;
}
public String getService() {
return service;
}
public void setService(String service) {
this.service = service;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
}

@ -0,0 +1,16 @@
package com.taobao.arthas.grpc.server.handler;
import java.util.List;
/**
* @author: FengYe
* @date: 2024/9/18 23:12
* @description: http2 frame grpc
*/
public class Http2FrameRequest {
/**
* grpc
*/
private List<GrpcRequest> grpcRequests;
}

@ -0,0 +1,113 @@
package com.taobao.arthas.grpc.server.handler;
import arthas.grpc.common.ArthasGrpc;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.grpc.server.handler.executor.GrpcExecutorFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http2.*;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.*;
import java.lang.invoke.MethodHandles;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: FengYe
* @date: 2024/7/7 9:58
* @description: Http2Handler
*/
public class Http2Handler extends SimpleChannelInboundHandler<Http2Frame> {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private GrpcDispatcher grpcDispatcher;
private GrpcExecutorFactory grpcExecutorFactory;
private final EventExecutorGroup executorGroup = new NioEventLoopGroup();
/**
*
*/
private ConcurrentHashMap<Integer, GrpcRequest> dataBuffer = new ConcurrentHashMap<>();
private static final String HEADER_PATH = ":path";
public Http2Handler(GrpcDispatcher grpcDispatcher, GrpcExecutorFactory grpcExecutorFactory) {
this.grpcDispatcher = grpcDispatcher;
this.grpcExecutorFactory = grpcExecutorFactory;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Frame frame) throws IOException {
if (frame instanceof Http2HeadersFrame) {
handleGrpcRequest((Http2HeadersFrame) frame, ctx);
} else if (frame instanceof Http2DataFrame) {
handleGrpcData((Http2DataFrame) frame, ctx);
} else if (frame instanceof Http2ResetFrame) {
handleResetStream((Http2ResetFrame) frame, ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
private void handleGrpcRequest(Http2HeadersFrame headersFrame, ChannelHandlerContext ctx) {
int id = headersFrame.stream().id();
String path = headersFrame.headers().get(HEADER_PATH).toString();
// 去掉前面的斜杠,然后按斜杠分割
String[] parts = path.substring(1).split("/");
GrpcRequest grpcRequest = new GrpcRequest(headersFrame.stream().id(), parts[0], parts[1]);
grpcRequest.setHeaders(headersFrame.headers());
GrpcDispatcher.checkGrpcType(grpcRequest);
dataBuffer.put(id, grpcRequest);
System.out.println("Received headers: " + headersFrame.headers());
}
private void handleGrpcData(Http2DataFrame dataFrame, ChannelHandlerContext ctx) throws IOException {
int streamId = dataFrame.stream().id();
GrpcRequest grpcRequest = dataBuffer.get(streamId);
ByteBuf content = dataFrame.content();
grpcRequest.writeData(content);
executorGroup.execute(() -> {
try {
grpcExecutorFactory.getExecutor(grpcRequest.getGrpcType()).execute(grpcRequest, dataFrame, ctx);
} catch (Throwable e) {
logger.error("handleGrpcData error", e);
processError(ctx, e, dataFrame.stream());
}
});
}
private void handleResetStream(Http2ResetFrame resetFrame, ChannelHandlerContext ctx) {
int id = resetFrame.stream().id();
System.out.println("handleResetStream");
dataBuffer.remove(id);
}
private void processError(ChannelHandlerContext ctx, Throwable e, Http2FrameStream stream) {
GrpcResponse response = new GrpcResponse();
ArthasGrpc.ErrorRes.Builder builder = ArthasGrpc.ErrorRes.newBuilder();
ArthasGrpc.ErrorRes errorRes = builder.setErrorMsg(Optional.ofNullable(e.getMessage()).orElse("")).build();
response.setClazz(ArthasGrpc.ErrorRes.class);
response.writeResponseData(errorRes);
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndHeader()).stream(stream));
ctx.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(stream));
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(stream));
}
}

@ -0,0 +1,13 @@
package com.taobao.arthas.grpc.server.handler;
/**
* @author: FengYe
* @date: 2024/10/24 00:22
* @description: StreamObserver
*/
public interface StreamObserver<V> {
void onNext(V req);
void onCompleted();
}

@ -0,0 +1,23 @@
package com.taobao.arthas.grpc.server.handler.annotation;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author: FengYe
* @date: 2024/9/6 01:57
* @description: GrpcMethod
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface GrpcMethod {
String value() default "";
boolean stream() default false;
GrpcInvokeTypeEnum grpcType() default GrpcInvokeTypeEnum.UNARY;
}

@ -0,0 +1,17 @@
package com.taobao.arthas.grpc.server.handler.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author: FengYe
* @date: 2024/9/6 01:57
* @description: GrpcService
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface GrpcService {
String value() default "";
}

@ -0,0 +1,13 @@
package com.taobao.arthas.grpc.server.handler.constant;
/**
* @author: FengYe
* @date: 2024/10/24 01:06
* @description: StreamTypeEnum
*/
public enum GrpcInvokeTypeEnum {
UNARY,
SERVER_STREAM,
CLIENT_STREAM,
BI_STREAM;
}

@ -0,0 +1,22 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: FengYe
* @date: 2024/10/24 02:07
* @description: AbstractGrpcExecutor
*/
public abstract class AbstractGrpcExecutor implements GrpcExecutor{
protected GrpcDispatcher dispatcher;
protected ConcurrentHashMap<Integer, StreamObserver<GrpcRequest>> requestStreamObserverMap = new ConcurrentHashMap<>();
public AbstractGrpcExecutor(GrpcDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
}

@ -0,0 +1,66 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author: FengYe
* @date: 2024/10/24 01:52
* @description: BiStreamProcessor
*/
public class BiStreamExecutor extends AbstractGrpcExecutor {
public BiStreamExecutor(GrpcDispatcher dispatcher) {
super(dispatcher);
}
@Override
public GrpcInvokeTypeEnum supportGrpcType() {
return GrpcInvokeTypeEnum.BI_STREAM;
}
@Override
public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable {
Integer streamId = request.getStreamId();
StreamObserver<GrpcRequest> requestObserver = requestStreamObserverMap.computeIfAbsent(streamId, id->{
StreamObserver<GrpcResponse> responseObserver = new StreamObserver<GrpcResponse>() {
AtomicBoolean sendHeader = new AtomicBoolean(false);
@Override
public void onNext(GrpcResponse res) {
// 控制流只能响应一次header
if (!sendHeader.get()) {
sendHeader.compareAndSet(false, true);
context.writeAndFlush(new DefaultHttp2HeadersFrame(res.getEndHeader()).stream(frame.stream()));
}
context.writeAndFlush(new DefaultHttp2DataFrame(res.getResponseData()).stream(frame.stream()));
}
@Override
public void onCompleted() {
context.writeAndFlush(new DefaultHttp2HeadersFrame(GrpcResponse.getDefaultEndStreamHeader(), true).stream(frame.stream()));
}
};
try {
return dispatcher.biStreamExecute(request, responseObserver);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
requestObserver.onNext(request);
if (frame.isEndStream()) {
requestObserver.onCompleted();
}
}
}

@ -0,0 +1,66 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author: FengYe
* @date: 2024/10/24 01:51
* @description: UnaryProcessor
*/
public class ClientStreamExecutor extends AbstractGrpcExecutor {
public ClientStreamExecutor(GrpcDispatcher dispatcher) {
super(dispatcher);
}
@Override
public GrpcInvokeTypeEnum supportGrpcType() {
return GrpcInvokeTypeEnum.CLIENT_STREAM;
}
@Override
public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable {
Integer streamId = request.getStreamId();
StreamObserver<GrpcRequest> requestObserver = requestStreamObserverMap.computeIfAbsent(streamId, id->{
StreamObserver<GrpcResponse> responseObserver = new StreamObserver<GrpcResponse>() {
AtomicBoolean sendHeader = new AtomicBoolean(false);
@Override
public void onNext(GrpcResponse res) {
// 控制流只能响应一次header
if (!sendHeader.get()) {
sendHeader.compareAndSet(false, true);
context.writeAndFlush(new DefaultHttp2HeadersFrame(res.getEndHeader()).stream(frame.stream()));
}
context.writeAndFlush(new DefaultHttp2DataFrame(res.getResponseData()).stream(frame.stream()));
}
@Override
public void onCompleted() {
context.writeAndFlush(new DefaultHttp2HeadersFrame(GrpcResponse.getDefaultEndStreamHeader(), true).stream(frame.stream()));
}
};
try {
return dispatcher.clientStreamExecute(request, responseObserver);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
requestObserver.onNext(request);
if (frame.isEndStream()) {
requestObserver.onCompleted();
}
}
}

@ -0,0 +1,17 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2DataFrame;
/**
* @author: FengYe
* @date: 2024/10/24 01:50
* @description: GrpcProcessor
*/
public interface GrpcExecutor {
GrpcInvokeTypeEnum supportGrpcType();
void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable;
}

@ -0,0 +1,55 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import com.taobao.arthas.grpc.server.utils.ReflectUtil;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author: FengYe
* @date: 2024/10/24 01:56
* @description: GrpcExecutorFactory
*/
public class GrpcExecutorFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
public static final String DEFAULT_GRPC_EXECUTOR_PACKAGE_NAME = "com.taobao.arthas.grpc.server.handler.executor";
private final Map<GrpcInvokeTypeEnum, GrpcExecutor> map = new HashMap<>();
public void loadExecutor(GrpcDispatcher dispatcher) {
List<Class<?>> classes = ReflectUtil.findClasses(DEFAULT_GRPC_EXECUTOR_PACKAGE_NAME);
for (Class<?> clazz : classes) {
if (GrpcExecutor.class.isAssignableFrom(clazz)) {
try {
if (AbstractGrpcExecutor.class.equals(clazz) || GrpcExecutor.class.equals(clazz)) {
continue;
}
if (AbstractGrpcExecutor.class.isAssignableFrom(clazz)) {
Constructor<?> constructor = clazz.getConstructor(GrpcDispatcher.class);
GrpcExecutor executor = (GrpcExecutor) constructor.newInstance(dispatcher);
map.put(executor.supportGrpcType(), executor);
} else {
Constructor<?> constructor = clazz.getConstructor();
GrpcExecutor executor = (GrpcExecutor) constructor.newInstance();
map.put(executor.supportGrpcType(), executor);
}
} catch (Exception e) {
logger.error("GrpcExecutorFactory loadExecutor error", e);
}
}
}
}
public GrpcExecutor getExecutor(GrpcInvokeTypeEnum grpcType) {
return map.get(grpcType);
}
}

@ -0,0 +1,57 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author: FengYe
* @date: 2024/10/24 01:51
* @description: UnaryProcessor
*/
public class ServerStreamExecutor extends AbstractGrpcExecutor {
public ServerStreamExecutor(GrpcDispatcher dispatcher) {
super(dispatcher);
}
@Override
public GrpcInvokeTypeEnum supportGrpcType() {
return GrpcInvokeTypeEnum.SERVER_STREAM;
}
@Override
public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable {
StreamObserver<GrpcResponse> responseObserver = new StreamObserver<GrpcResponse>() {
AtomicBoolean sendHeader = new AtomicBoolean(false);
@Override
public void onNext(GrpcResponse res) {
// 控制流只能响应一次header
if (!sendHeader.get()) {
sendHeader.compareAndSet(false, true);
context.writeAndFlush(new DefaultHttp2HeadersFrame(res.getEndHeader()).stream(frame.stream()));
}
context.writeAndFlush(new DefaultHttp2DataFrame(res.getResponseData()).stream(frame.stream()));
}
@Override
public void onCompleted() {
context.writeAndFlush(new DefaultHttp2HeadersFrame(GrpcResponse.getDefaultEndStreamHeader(), true).stream(frame.stream()));
}
};
try {
dispatcher.serverStreamExecute(request, responseObserver);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,38 @@
package com.taobao.arthas.grpc.server.handler.executor;
import com.taobao.arthas.grpc.server.handler.GrpcDispatcher;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
/**
* @author: FengYe
* @date: 2024/10/24 01:51
* @description: UnaryProcessor
*/
public class UnaryExecutor extends AbstractGrpcExecutor {
public UnaryExecutor(GrpcDispatcher dispatcher) {
super(dispatcher);
}
@Override
public GrpcInvokeTypeEnum supportGrpcType() {
return GrpcInvokeTypeEnum.UNARY;
}
@Override
public void execute(GrpcRequest request, Http2DataFrame frame, ChannelHandlerContext context) throws Throwable {
// 一元调用,等到 endStream 再响应
if (frame.isEndStream()) {
GrpcResponse response = dispatcher.unaryExecute(request);
context.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndHeader()).stream(frame.stream()));
context.writeAndFlush(new DefaultHttp2DataFrame(response.getResponseData()).stream(frame.stream()));
context.writeAndFlush(new DefaultHttp2HeadersFrame(response.getEndStreamHeader(), true).stream(frame.stream()));
}
}
}

@ -0,0 +1,26 @@
package com.taobao.arthas.grpc.server.service;
import arthas.grpc.unittest.ArthasUnittest;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
/**
* @author: FengYe
* @date: 2024/6/30 11:42
* @description: ArthasSampleService
*/
public interface ArthasSampleService {
ArthasUnittest.ArthasUnittestResponse unary(ArthasUnittest.ArthasUnittestRequest command);
ArthasUnittest.ArthasUnittestResponse unaryAddSum(ArthasUnittest.ArthasUnittestRequest command);
ArthasUnittest.ArthasUnittestResponse unaryGetSum(ArthasUnittest.ArthasUnittestRequest command);
StreamObserver<GrpcRequest<ArthasUnittest.ArthasUnittestRequest>> clientStreamSum(StreamObserver<GrpcResponse<ArthasUnittest.ArthasUnittestResponse>> observer);
void serverStream(ArthasUnittest.ArthasUnittestRequest request, StreamObserver<GrpcResponse<ArthasUnittest.ArthasUnittestResponse>> observer);
StreamObserver<GrpcRequest<ArthasUnittest.ArthasUnittestRequest>> biStream(StreamObserver<GrpcResponse<ArthasUnittest.ArthasUnittestResponse>> observer);
}

@ -0,0 +1,133 @@
package com.taobao.arthas.grpc.server.service.impl;
import arthas.grpc.unittest.ArthasUnittest;
import com.google.protobuf.InvalidProtocolBufferException;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcService;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import com.taobao.arthas.grpc.server.service.ArthasSampleService;
import com.taobao.arthas.grpc.server.utils.ByteUtil;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author: FengYe
* @date: 2024/6/30 11:43
* @description: ArthasSampleServiceImpl
*/
@GrpcService("arthas.grpc.unittest.ArthasUnittestService")
public class ArthasSampleServiceImpl implements ArthasSampleService {
private ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
@Override
@GrpcMethod(value = "unary")
public ArthasUnittest.ArthasUnittestResponse unary(ArthasUnittest.ArthasUnittestRequest command) {
ArthasUnittest.ArthasUnittestResponse.Builder builder = ArthasUnittest.ArthasUnittestResponse.newBuilder();
builder.setMessage(command.getMessage());
return builder.build();
}
@Override
@GrpcMethod(value = "unaryAddSum")
public ArthasUnittest.ArthasUnittestResponse unaryAddSum(ArthasUnittest.ArthasUnittestRequest command) {
ArthasUnittest.ArthasUnittestResponse.Builder builder = ArthasUnittest.ArthasUnittestResponse.newBuilder();
builder.setMessage(command.getMessage());
map.merge(command.getId(), command.getNum(), Integer::sum);
return builder.build();
}
@Override
@GrpcMethod(value = "unaryGetSum")
public ArthasUnittest.ArthasUnittestResponse unaryGetSum(ArthasUnittest.ArthasUnittestRequest command) {
ArthasUnittest.ArthasUnittestResponse.Builder builder = ArthasUnittest.ArthasUnittestResponse.newBuilder();
builder.setMessage(command.getMessage());
Integer sum = map.getOrDefault(command.getId(), 0);
builder.setNum(sum);
return builder.build();
}
@Override
@GrpcMethod(value = "clientStreamSum", grpcType = GrpcInvokeTypeEnum.CLIENT_STREAM)
public StreamObserver<GrpcRequest<ArthasUnittest.ArthasUnittestRequest>> clientStreamSum(StreamObserver<GrpcResponse<ArthasUnittest.ArthasUnittestResponse>> observer) {
return new StreamObserver<GrpcRequest<ArthasUnittest.ArthasUnittestRequest>>() {
AtomicInteger sum = new AtomicInteger(0);
@Override
public void onNext(GrpcRequest<ArthasUnittest.ArthasUnittestRequest> req) {
try {
byte[] bytes = req.readData();
while (bytes != null && bytes.length != 0) {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.parseFrom(bytes);
sum.addAndGet(request.getNum());
bytes = req.readData();
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public void onCompleted() {
ArthasUnittest.ArthasUnittestResponse response = ArthasUnittest.ArthasUnittestResponse.newBuilder()
.setNum(sum.get())
.build();
GrpcResponse<ArthasUnittest.ArthasUnittestResponse> grpcResponse = new GrpcResponse<>();
grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService");
grpcResponse.setMethod("clientStreamSum");
grpcResponse.writeResponseData(response);
observer.onNext(grpcResponse);
observer.onCompleted();
}
};
}
@Override
@GrpcMethod(value = "serverStream", grpcType = GrpcInvokeTypeEnum.SERVER_STREAM)
public void serverStream(ArthasUnittest.ArthasUnittestRequest request, StreamObserver<GrpcResponse<ArthasUnittest.ArthasUnittestResponse>> observer) {
for (int i = 0; i < 5; i++) {
ArthasUnittest.ArthasUnittestResponse response = ArthasUnittest.ArthasUnittestResponse.newBuilder()
.setMessage("Server response " + i + " to " + request.getMessage())
.build();
GrpcResponse<ArthasUnittest.ArthasUnittestResponse> grpcResponse = new GrpcResponse<>();
grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService");
grpcResponse.setMethod("serverStream");
grpcResponse.writeResponseData(response);
observer.onNext(grpcResponse);
}
observer.onCompleted();
}
@Override
@GrpcMethod(value = "biStream", grpcType = GrpcInvokeTypeEnum.BI_STREAM)
public StreamObserver<GrpcRequest<ArthasUnittest.ArthasUnittestRequest>> biStream(StreamObserver<GrpcResponse<ArthasUnittest.ArthasUnittestResponse>> observer) {
return new StreamObserver<GrpcRequest<ArthasUnittest.ArthasUnittestRequest>>() {
@Override
public void onNext(GrpcRequest<ArthasUnittest.ArthasUnittestRequest> req) {
try {
byte[] bytes = req.readData();
while (bytes != null && bytes.length != 0) {
GrpcResponse<ArthasUnittest.ArthasUnittestResponse> grpcResponse = new GrpcResponse<>();
grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService");
grpcResponse.setMethod("biStream");
grpcResponse.writeResponseData(ArthasUnittest.ArthasUnittestResponse.parseFrom(bytes));
observer.onNext(grpcResponse);
bytes = req.readData();
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
}

@ -0,0 +1,33 @@
package com.taobao.arthas.grpc.server.utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* @author: FengYe
* @date: 2024/9/5 00:51
* @description: ByteUtil
*/
public class ByteUtil {
public static ByteBuf newByteBuf() {
return PooledByteBufAllocator.DEFAULT.buffer();
}
public static ByteBuf newByteBuf(byte[] bytes) {
return PooledByteBufAllocator.DEFAULT.buffer(bytes.length).writeBytes(bytes);
}
public static byte[] getBytes(ByteBuf buf) {
if (buf.hasArray()) {
// 如果 ByteBuf 是一个支持底层数组的实现,直接获取数组
return buf.array();
} else {
// 创建一个新的 byte 数组
byte[] bytes = new byte[buf.readableBytes()];
// 将 ByteBuf 的内容复制到 byte 数组中
buf.getBytes(buf.readerIndex(), bytes);
return bytes;
}
}
}

@ -0,0 +1,35 @@
package com.taobao.arthas.grpc.server.utils;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* @author: FengYe
* @date: 2024/9/6 02:20
* @description: ReflectUtil
*/
public class ReflectUtil {
public static List<Class<?>> findClasses(String packageName) {
List<Class<?>> classes = new ArrayList<>();
String path = packageName.replace('.', '/');
try {
URL resource = Thread.currentThread().getContextClassLoader().getResource(path);
if (resource != null) {
File directory = new File(resource.toURI());
if (directory.exists()) {
for (File file : directory.listFiles()) {
if (file.isFile() && file.getName().endsWith(".class")) {
String className = packageName + '.' + file.getName().replace(".class", "");
classes.add(Class.forName(className));
}
}
}
}
} catch (Exception e) {
}
return classes;
}
}

@ -0,0 +1,7 @@
syntax = "proto3";
package arthas.grpc.common;
message ErrorRes {
string errorMsg = 1;
}

@ -0,0 +1,24 @@
syntax = "proto3";
package arthas.grpc.unittest;
service ArthasUnittestService {
rpc unary(ArthasUnittestRequest) returns (ArthasUnittestResponse);
rpc unaryAddSum(ArthasUnittestRequest) returns (ArthasUnittestResponse);
rpc unaryGetSum(ArthasUnittestRequest) returns (ArthasUnittestResponse);
rpc clientStreamSum(stream ArthasUnittestRequest) returns (ArthasUnittestResponse);
rpc serverStream(ArthasUnittestRequest) returns (stream ArthasUnittestResponse);
rpc biStream(stream ArthasUnittestRequest) returns (stream ArthasUnittestResponse);
}
message ArthasUnittestRequest {
int32 id = 1;
string message = 2;
int32 num = 3;
}
message ArthasUnittestResponse{
int32 id = 1;
string message = 2;
int32 num = 3;
}

@ -0,0 +1,268 @@
package unittest.grpc;
import arthas.grpc.unittest.ArthasUnittest;
import arthas.grpc.unittest.ArthasUnittestServiceGrpc;
import com.taobao.arthas.grpc.server.ArthasGrpcServer;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author: FengYe
* @date: 2024/9/24 00:17
* @description: GrpcUnaryTest
*/
public class GrpcTest {
private static final String HOST = "localhost";
private static final int PORT = 9090;
private static final String HOST_PORT = HOST + ":" + PORT;
private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl";
private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null;
Random random = new Random();
ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Before
public void startServer() {
Thread grpcWebProxyStart = new Thread(() -> {
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(PORT, UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME);
arthasGrpcServer.start();
});
grpcWebProxyStart.start();
}
@Test
public void testUnary() {
ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT)
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel);
try {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("unaryInvoke").build();
ArthasUnittest.ArthasUnittestResponse res = stub.unary(request);
System.out.println(res.getMessage());
} finally {
channel.shutdownNow();
}
}
@Test
public void testUnarySum() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT)
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel);
for (int i = 0; i < 10; i++) {
AtomicInteger sum = new AtomicInteger(0);
int finalId = i;
for (int j = 0; j < 100; j++) {
int num = random.nextInt(101);
sum.addAndGet(num);
threadPool.submit(() -> {
addSum(stub, finalId, num);
});
}
Thread.sleep(2000L);
int grpcSum = getSum(stub, finalId);
System.out.println("id:" + finalId + ",sum:" + sum.get() + ",grpcSum:" + grpcSum);
Assert.assertEquals(sum.get(), grpcSum);
}
channel.shutdown();
}
// 用于测试客户端流
@Test
public void testClientStreamSum() throws Throwable {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
AtomicInteger sum = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ArthasUnittest.ArthasUnittestRequest> clientStreamObserver = stub.clientStreamSum(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
@Override
public void onNext(ArthasUnittest.ArthasUnittestResponse response) {
System.out.println("local sum:" + sum + ", grpc sum:" + response.getNum());
Assert.assertEquals(sum.get(), response.getNum());
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t);
}
@Override
public void onCompleted() {
System.out.println("testClientStreamSum completed.");
latch.countDown();
}
});
for (int j = 0; j < 1000; j++) {
int num = random.nextInt(1001);
sum.addAndGet(num);
clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build());
}
clientStreamObserver.onCompleted();
latch.await();
channel.shutdown();
}
// 用于测试请求数据隔离性
@Test
public void testDataIsolation() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {
AtomicInteger sum = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ArthasUnittest.ArthasUnittestRequest> clientStreamObserver = stub.clientStreamSum(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
@Override
public void onNext(ArthasUnittest.ArthasUnittestResponse response) {
System.out.println("local sum:" + sum + ", grpc sum:" + response.getNum());
Assert.assertEquals(sum.get(), response.getNum());
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t);
}
@Override
public void onCompleted() {
System.out.println("testDataIsolation completed.");
latch.countDown();
}
});
for (int j = 0; j < 5; j++) {
int num = random.nextInt(101);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sum.addAndGet(num);
clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build());
}
clientStreamObserver.onCompleted();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
channel.shutdown();
});
}
Thread.sleep(7000L);
}
@Test
public void testServerStream() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("serverStream").build();
stub.serverStream(request, new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
@Override
public void onNext(ArthasUnittest.ArthasUnittestResponse value) {
System.out.println("testServerStream client receive: " + value.getMessage());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("testServerStream completed");
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.shutdown();
}
}
// 用于测试双向流
@Test
public void testBiStream() throws Throwable {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ArthasUnittest.ArthasUnittestRequest> biStreamObserver = stub.biStream(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
@Override
public void onNext(ArthasUnittest.ArthasUnittestResponse response) {
System.out.println("testBiStream receive: "+response.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t);
}
@Override
public void onCompleted() {
System.out.println("testBiStream completed.");
latch.countDown();
}
});
String[] messages = new String[]{"testBiStream1","testBiStream2","testBiStream3"};
for (String msg : messages) {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage(msg).build();
biStreamObserver.onNext(request);
}
Thread.sleep(2000);
biStreamObserver.onCompleted();
latch.await();
channel.shutdown();
}
private void addSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id, int num) {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setId(id).setNum(num).build();
ArthasUnittest.ArthasUnittestResponse res = stub.unaryAddSum(request);
}
private int getSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id) {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setId(id).build();
ArthasUnittest.ArthasUnittestResponse res = stub.unaryGetSum(request);
return res.getNum();
}
}

@ -0,0 +1,26 @@
package unittest.grpc.service;
import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestRequest;
import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestResponse;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
/**
* @author: FengYe
* @date: 2024/6/30 11:42
* @description: ArthasSampleService
*/
public interface ArthasUnittestService {
ArthasUnittestResponse unary(ArthasUnittestRequest command);
ArthasUnittestResponse unaryAddSum(ArthasUnittestRequest command);
ArthasUnittestResponse unaryGetSum(ArthasUnittestRequest command);
StreamObserver<GrpcRequest<ArthasUnittestRequest>> clientStreamSum(StreamObserver<GrpcResponse<ArthasUnittestResponse>> observer);
void serverStream(ArthasUnittestRequest request, StreamObserver<GrpcResponse<ArthasUnittestResponse>> observer);
StreamObserver<GrpcRequest<ArthasUnittestRequest>> biStream(StreamObserver<GrpcResponse<ArthasUnittestResponse>> observer);
}

@ -0,0 +1,134 @@
package unittest.grpc.service.impl;
import arthas.grpc.unittest.ArthasUnittest;
import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestRequest;
import arthas.grpc.unittest.ArthasUnittest.ArthasUnittestResponse;
import com.google.protobuf.InvalidProtocolBufferException;
import com.taobao.arthas.grpc.server.handler.GrpcRequest;
import com.taobao.arthas.grpc.server.handler.GrpcResponse;
import com.taobao.arthas.grpc.server.handler.StreamObserver;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcMethod;
import com.taobao.arthas.grpc.server.handler.annotation.GrpcService;
import com.taobao.arthas.grpc.server.handler.constant.GrpcInvokeTypeEnum;
import unittest.grpc.service.ArthasUnittestService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author: FengYe
* @date: 2024/6/30 11:43
* @description: ArthasSampleServiceImpl
*/
@GrpcService("arthas.grpc.unittest.ArthasUnittestService")
public class ArthasUnittestServiceImpl implements ArthasUnittestService {
private ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
@Override
@GrpcMethod(value = "unary")
public ArthasUnittestResponse unary(ArthasUnittestRequest command) {
ArthasUnittestResponse.Builder builder = ArthasUnittestResponse.newBuilder();
builder.setMessage(command.getMessage());
return builder.build();
}
@Override
@GrpcMethod(value = "unaryAddSum")
public ArthasUnittestResponse unaryAddSum(ArthasUnittestRequest command) {
ArthasUnittestResponse.Builder builder = ArthasUnittestResponse.newBuilder();
builder.setMessage(command.getMessage());
map.merge(command.getId(), command.getNum(), Integer::sum);
return builder.build();
}
@Override
@GrpcMethod(value = "unaryGetSum")
public ArthasUnittestResponse unaryGetSum(ArthasUnittestRequest command) {
ArthasUnittestResponse.Builder builder = ArthasUnittestResponse.newBuilder();
builder.setMessage(command.getMessage());
Integer sum = map.getOrDefault(command.getId(), 0);
builder.setNum(sum);
return builder.build();
}
@Override
@GrpcMethod(value = "clientStreamSum", grpcType = GrpcInvokeTypeEnum.CLIENT_STREAM)
public StreamObserver<GrpcRequest<ArthasUnittestRequest>> clientStreamSum(StreamObserver<GrpcResponse<ArthasUnittestResponse>> observer) {
return new StreamObserver<GrpcRequest<ArthasUnittestRequest>>() {
AtomicInteger sum = new AtomicInteger(0);
@Override
public void onNext(GrpcRequest<ArthasUnittestRequest> req) {
try {
byte[] bytes = req.readData();
while (bytes != null && bytes.length != 0) {
ArthasUnittestRequest request = ArthasUnittestRequest.parseFrom(bytes);
sum.addAndGet(request.getNum());
bytes = req.readData();
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public void onCompleted() {
ArthasUnittestResponse response = ArthasUnittestResponse.newBuilder()
.setNum(sum.get())
.build();
GrpcResponse<ArthasUnittestResponse> grpcResponse = new GrpcResponse<>();
grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService");
grpcResponse.setMethod("clientStreamSum");
grpcResponse.writeResponseData(response);
observer.onNext(grpcResponse);
observer.onCompleted();
}
};
}
@Override
@GrpcMethod(value = "serverStream", grpcType = GrpcInvokeTypeEnum.SERVER_STREAM)
public void serverStream(ArthasUnittestRequest request, StreamObserver<GrpcResponse<ArthasUnittestResponse>> observer) {
for (int i = 0; i < 5; i++) {
ArthasUnittest.ArthasUnittestResponse response = ArthasUnittest.ArthasUnittestResponse.newBuilder()
.setMessage("Server response " + i + " to " + request.getMessage())
.build();
GrpcResponse<ArthasUnittestResponse> grpcResponse = new GrpcResponse<>();
grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService");
grpcResponse.setMethod("serverStream");
grpcResponse.writeResponseData(response);
observer.onNext(grpcResponse);
}
observer.onCompleted();
}
@Override
@GrpcMethod(value = "biStream", grpcType = GrpcInvokeTypeEnum.BI_STREAM)
public StreamObserver<GrpcRequest<ArthasUnittestRequest>> biStream(StreamObserver<GrpcResponse<ArthasUnittestResponse>> observer) {
return new StreamObserver<GrpcRequest<ArthasUnittestRequest>>() {
@Override
public void onNext(GrpcRequest<ArthasUnittestRequest> req) {
try {
byte[] bytes = req.readData();
while (bytes != null && bytes.length != 0) {
GrpcResponse<ArthasUnittestResponse> grpcResponse = new GrpcResponse<>();
grpcResponse.setService("arthas.grpc.unittest.ArthasUnittestService");
grpcResponse.setMethod("biStream");
grpcResponse.writeResponseData(ArthasUnittestResponse.parseFrom(bytes));
observer.onNext(grpcResponse);
bytes = req.readData();
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public void onCompleted() {
observer.onCompleted();
}
};
}
}

@ -80,6 +80,7 @@
<module>labs/cluster-management/native-agent-management-web</module>
<module>labs/cluster-management/native-agent-proxy</module>
<module>labs/cluster-management/native-agent-common</module>
<module>labs/arthas-grpc-server</module>
</modules>
<properties>

Loading…
Cancel
Save