add arthas-grpc-web-proxy module (#2668)

pull/2677/head^2
x956 1 year ago committed by GitHub
parent cceb196517
commit 3745f08752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,40 @@
## netty grpc web proxy
from: https://github.com/grpc/grpc-web/tree/1.4.2/src/connector
原项目已废弃删除,本项目改用 netty 来做转发。
## 缺点
原项目需要 `.proto` 文件编译的 `.class`才能运行,比如`GreeterGrpc`,本项目同样有这个问题。
## 测试
工程导入IDE之后,进入test目录
在 com.taobao.arthas.grpcweb.proxy.server.GrpcWebProxyServerTest 启动测试
也可以用原项目的相关工程来测试
* https://github.com/grpc/grpc-web/
## 开发验证
可以用其它的 grpc web proxy来抓包辅助验证。
### 用 envoy
下载envoy 后,可以用本项目里的`envoy.yaml`
* `envoy --config-path ./envoy.yaml`
### 使用 grpcwebproxy
* https://github.com/improbable-eng/grpc-web/blob/master/go/grpcwebproxy/README.md
下载后,启动:
* `grpcwebproxy --backend_addr 127.0.0.1:9090 --run_tls_server=false --allow_all_origins`

@ -0,0 +1,147 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arthas-all</artifactId>
<groupId>com.taobao.arthas</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>arthas-grpc-web-proxy</artifactId>
<name>arthas-grpc-web-proxy</name>
<url>https://github.com/alibaba/arthas</url>
<properties>
<java.version>1.8</java.version>
<grpc.version>1.46.0</grpc.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.arthas</groupId>
<artifactId>arthas-repackage-logger</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taobao.arthas</groupId>
<artifactId>arthas-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taobao.arthas</groupId>
<artifactId>arthas-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>mac</id>
<activation>
<os>
<family>mac</family>
</os>
</activation>
<properties>
<os.detected.classifier>osx-x86_64</os.detected.classifier>
</properties>
</profile>
</profiles>
<build>
<finalName>${project.artifactId}</finalName>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protoSourceRoot>${basedir}/src/main/proto</protoSourceRoot>
<protocArtifact>com.google.protobuf:protoc:3.11.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.28.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,27 @@
package com.taobao.arthas.grpcweb.proxy;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
/**
* TODO cors header
* @author hengyunabc 2023-09-07
*
*/
public class CorsUtils {
public static void updateCorsHeader(HttpHeaders headers) {
// headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS,
// StringUtils.joinWith(",", "user-agent", "cache-control", "content-type", "content-transfer-encoding",
// "grpc-timeout", "keep-alive"));
headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
headers.set(HttpHeaderNames.ACCESS_CONTROL_REQUEST_HEADERS, "content-type,x-grpc-web,x-user-agent");
headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS,GET,POST,HEAD");
// headers.set(HttpHeaderNames.ACCESS_CONTROL_EXPOSE_HEADERS,
// StringUtils.joinWith(",", "grpc-status", "grpc-message"));
headers.set(HttpHeaderNames.ACCESS_CONTROL_EXPOSE_HEADERS, "*");
}
}

@ -0,0 +1,44 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/**
* TODO: Manage the connection pool to talk to the grpc-service
*/
public class GrpcServiceConnectionManager {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private final ManagedChannel channel;
public GrpcServiceConnectionManager(int grpcPortNum) {
// TODO: Manage a connection pool.
channel = ManagedChannelBuilder.forAddress("localhost", grpcPortNum).usePlaintext().build();
logger.info("**** connection channel initiated");
}
Channel getChannelWithClientInterceptor(GrpcWebClientInterceptor interceptor) {
return ClientInterceptors.intercept(channel, interceptor);
}
}

@ -0,0 +1,75 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import io.grpc.*;
import io.grpc.ClientCall.Listener;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import java.util.concurrent.CountDownLatch;
class GrpcWebClientInterceptor implements ClientInterceptor {
private final CountDownLatch latch;
private final SendGrpcWebResponse sendResponse;
GrpcWebClientInterceptor(CountDownLatch latch, SendGrpcWebResponse send) {
this.latch = latch;
sendResponse = send;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel channel) {
return new SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new MetadataResponseListener<RespT>(responseListener), headers);
}
};
}
class MetadataResponseListener<T> extends SimpleForwardingClientCallListener<T> {
private boolean headersSent = false;
MetadataResponseListener(Listener<T> responseListener) {
super(responseListener);
}
@Override
public void onHeaders(Metadata h) {
sendResponse.writeHeaders(h);
headersSent = true;
}
@Override
public void onClose(Status s, Metadata t) {
// TODO 这个函数会在 onCompleted 之前回调,这里有点奇怪
if (!headersSent) {
// seems, sometimes onHeaders() is not called before this method is called!
// so far, they are the error cases. let onError() method in ClientListener
// handle this call. Could ignore this.
// TODO is this correct? what if onError() never gets called?
} else {
sendResponse.writeTrailer(s, t);
latch.countDown();
}
super.onClose(s, t);
}
}
}

@ -0,0 +1,183 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import com.taobao.arthas.common.Pair;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class GrpcWebRequestHandler {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private final GrpcServiceConnectionManager grpcServiceConnectionManager;
public GrpcWebRequestHandler(GrpcServiceConnectionManager g) {
grpcServiceConnectionManager = g;
}
public void handle(ChannelHandlerContext ctx, FullHttpRequest req) {
// 处理 CORS OPTIONS 请求
if (req.method().equals(HttpMethod.OPTIONS)) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
CorsUtils.updateCorsHeader(response.headers());
ctx.writeAndFlush(response);
return;
}
String contentTypeStr = req.headers().get(HttpHeaderNames.CONTENT_TYPE);
MessageUtils.ContentType contentType = MessageUtils.validateContentType(contentTypeStr);
SendGrpcWebResponse sendResponse = new SendGrpcWebResponse(ctx, req);
try {
// From the request, get the rpc-method name and class name and then get their
// corresponding
// concrete objects.
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String pathInfo = queryStringDecoder.path();
Pair<String, String> classAndMethodNames = getClassAndMethod(pathInfo);
String className = classAndMethodNames.getFirst();
String methodName = classAndMethodNames.getSecond();
Class cls = getClassObject(className);
if (cls == null) {
logger.error("cannot find service impl in the request, className: " + className);
// incorrect classname specified in the request.
sendResponse.returnUnimplementedStatusCode(className);
return;
}
// Create a ClientInterceptor object
CountDownLatch latch = new CountDownLatch(1);
GrpcWebClientInterceptor interceptor = new GrpcWebClientInterceptor(latch, sendResponse);
Channel channel = grpcServiceConnectionManager.getChannelWithClientInterceptor(interceptor);
// get the stub for the rpc call and the method to be called within the stub
io.grpc.stub.AbstractStub asyncStub = getRpcStub(channel, cls, "newStub");
Metadata headers = MetadataUtil.getHtpHeaders(req.headers());
if (!headers.keys().isEmpty()) {
asyncStub = MetadataUtils.attachHeaders(asyncStub, headers);
}
Method asyncStubCall = getRpcMethod(asyncStub, methodName);
// Get the input object bytes
ByteBuf content = req.content();
InputStream in = new ByteBufInputStream(content);
MessageDeframer deframer = new MessageDeframer();
Object inObj = null;
if (deframer.processInput(in, contentType)) {
inObj = MessageUtils.getInputProtobufObj(asyncStubCall, deframer.getMessageBytes());
}
// Invoke the rpc call
asyncStubCall.invoke(asyncStub, inObj, new GrpcCallResponseReceiver(sendResponse, latch));
if (!latch.await(500 * 1000, TimeUnit.MILLISECONDS)) {
logger.warn("grpc call took too long!");
}
} catch (Exception e) {
logger.error("try to invoke grpc serivce error, uri: {}", req.uri(), e);
sendResponse.writeError(Status.UNAVAILABLE.withCause(e));
}
}
private Pair<String, String> getClassAndMethod(String pathInfo) throws IllegalArgumentException {
// pathInfo starts with "/". ignore that first char.
String[] rpcClassAndMethodTokens = pathInfo.substring(1).split("/");
if (rpcClassAndMethodTokens.length != 2) {
throw new IllegalArgumentException("incorrect pathinfo: " + pathInfo);
}
String rpcClassName = rpcClassAndMethodTokens[0];
String rpcMethodNameRecvd = rpcClassAndMethodTokens[1];
String rpcMethodName = rpcMethodNameRecvd.substring(0, 1).toLowerCase() + rpcMethodNameRecvd.substring(1);
return new Pair<>(rpcClassName, rpcMethodName);
}
private Class<?> getClassObject(String className) {
Class rpcClass = null;
try {
rpcClass = Class.forName(className + "Grpc");
} catch (ClassNotFoundException e) {
logger.info("no such class " + className);
}
return rpcClass;
}
private io.grpc.stub.AbstractStub getRpcStub(Channel ch, Class cls, String stubName) {
try {
Method m = cls.getDeclaredMethod(stubName, io.grpc.Channel.class);
return (io.grpc.stub.AbstractStub) m.invoke(null, ch);
} catch (Exception e) {
logger.warn("Error when fetching " + stubName + " for: " + cls.getName());
throw new IllegalArgumentException(e);
}
}
/**
* Find the matching method in the stub class.
*/
private Method getRpcMethod(Object stub, String rpcMethodName) {
for (Method m : stub.getClass().getMethods()) {
if (m.getName().equals(rpcMethodName)) {
return m;
}
}
throw new IllegalArgumentException("Couldn't find rpcmethod: " + rpcMethodName);
}
private static class GrpcCallResponseReceiver<Object> implements StreamObserver {
private final SendGrpcWebResponse sendResponse;
private final CountDownLatch latch;
GrpcCallResponseReceiver(SendGrpcWebResponse s, CountDownLatch c) {
sendResponse = s;
latch = c;
}
@Override
public void onNext(java.lang.Object resp) {
// TODO verify that the resp object is of Class instance returnedCls.
byte[] outB = ((com.google.protobuf.GeneratedMessageV3) resp).toByteArray();
sendResponse.writeResponse(outB);
}
@Override
public void onError(Throwable t) {
Status s = Status.fromThrowable(t);
sendResponse.writeError(s);
latch.countDown();
}
@Override
public void onCompleted() {
sendResponse.writeTrailer(Status.OK, null);
latch.countDown();
}
}
}

@ -0,0 +1,134 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import com.taobao.arthas.grpcweb.proxy.MessageUtils.ContentType;
import com.taobao.arthas.common.IOUtils;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
/**
* Reads frames from the input bytes and returns a single message.
*/
public class MessageDeframer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
static final byte DATA_BYTE = (byte) 0x00;
// TODO: fix this code to be able to handle upto 4GB input size.
private int mLength = 0;
private int mReadSoFar = 0;
private ArrayList<byte[]> mFrames = new ArrayList<>();
private byte[] mMsg = null;
private int mNumFrames;
byte[] getMessageBytes() {
return mMsg;
}
int getLength() {
return mLength;
}
int getNumberOfFrames() {
return mNumFrames;
}
/**
* Reads the bytes from the given InputStream and populates bytes in
* {@link #mMsg}
*/
public boolean processInput(InputStream in, MessageUtils.ContentType contentType) {
byte[] inBytes;
try {
InputStream inStream = (contentType == ContentType.GRPC_WEB_TEXT) ? Base64.getDecoder().wrap(in) : in;
inBytes = IOUtils.getBytes(inStream);
} catch (IOException e) {
e.printStackTrace();
logger.warn("invalid input");
return false;
}
if (inBytes.length < 5) {
logger.debug("invalid input. Expected minimum of 5 bytes");
return false;
}
while (getNextFrameBytes(inBytes)) {
}
mNumFrames = mFrames.size();
// common case is only one frame.
if (mNumFrames == 1) {
mMsg = mFrames.get(0);
} else {
// concatenate all frames into one byte array
// TODO: this is inefficient.
mMsg = new byte[mLength];
int offset = 0;
for (byte[] f : mFrames) {
System.arraycopy(f, 0, mMsg, offset, f.length);
offset += f.length;
}
mFrames = null;
}
return true;
}
/** returns true if the next frame is a DATA frame */
private boolean getNextFrameBytes(byte[] inBytes) {
// Firstbyte should be 0x00 (for this to be a DATA frame)
int firstByteValue = inBytes[mReadSoFar] | DATA_BYTE;
if (firstByteValue != 0) {
logger.debug("done with DATA bytes");
return false;
}
// Next 4 bytes = length of the bytes array starting after the 4 bytes.
int offset = mReadSoFar + 1;
int len = ByteBuffer.wrap(inBytes, offset, 4).getInt();
// Empty message is special case.
// TODO: Can this is special handling be removed?
if (len == 0) {
mFrames.add(new byte[0]);
return false;
}
// Make sure we have enough bytes in the inputstream
int expectedNumBytes = len + 5 + mReadSoFar;
if (inBytes.length < expectedNumBytes) {
logger.warn(String.format("input doesn't have enough bytes. expected: %d, found %d", expectedNumBytes,
inBytes.length));
return false;
}
// Read "len" bytes into message
mLength += len;
offset += 4;
byte[] inputBytes = Arrays.copyOfRange(inBytes, offset, len + offset);
mFrames.add(inputBytes);
mReadSoFar += (len + 5);
// we have more frames to process, if there are bytes unprocessed
return inBytes.length > mReadSoFar;
}
}

@ -0,0 +1,43 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
/**
* Creates frames from the input bytes.
*/
public class MessageFramer {
public enum Type {
DATA ((byte) 0x00),
TRAILER ((byte) 0x80);
public final byte value;
Type(byte b) {
value = b;
}
}
// TODO: handle more than single frame; i.e., input byte array size > (2GB - 1)
public byte[] getPrefix(byte[] in, Type type) {
int len = in.length;
return new byte[] {
type.value,
(byte) ((len >> 24) & 0xff),
(byte) ((len >> 16) & 0xff),
(byte) ((len >> 8) & 0xff),
(byte) ((len >> 0) & 0xff),
};
}
}

@ -0,0 +1,82 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class MessageUtils {
@VisibleForTesting
public
enum ContentType {
GRPC_WEB_BINARY, GRPC_WEB_TEXT;
}
private static Map<String, ContentType> GRPC_GCP_CONTENT_TYPES = new HashMap<String, ContentType>() {
{
put("application/grpc-web", ContentType.GRPC_WEB_BINARY);
put("application/grpc-web+proto", ContentType.GRPC_WEB_BINARY);
put("application/grpc-web-text", ContentType.GRPC_WEB_TEXT);
put("application/grpc-web-text+proto", ContentType.GRPC_WEB_TEXT);
}
};
/**
* Validate the content-type
*/
public static ContentType validateContentType(String contentType) throws IllegalArgumentException {
if (contentType == null || !GRPC_GCP_CONTENT_TYPES.containsKey(contentType)) {
throw new IllegalArgumentException("This content type is not used for grpc-web: " + contentType);
}
return getContentType(contentType);
}
static ContentType getContentType(String type) {
return GRPC_GCP_CONTENT_TYPES.get(type);
}
/**
* Find the input arg protobuf class for the given rpc-method. Convert the given
* bytes to the input protobuf. return that.
*/
static Object getInputProtobufObj(Method rpcMethod, byte[] in) {
Class[] inputArgs = rpcMethod.getParameterTypes();
Class inputArgClass = inputArgs[0];
// use the inputArg classtype to create a protobuf object
Method parseFromObj;
try {
parseFromObj = inputArgClass.getMethod("parseFrom", byte[].class);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Couldn't find method in 'parseFrom' in " + inputArgClass.getName());
}
Object inputObj;
try {
inputObj = parseFromObj.invoke(null, in);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new IllegalArgumentException(e);
}
if (inputObj == null || !inputArgClass.isInstance(inputObj)) {
throw new IllegalArgumentException("Input obj is **not** instance of the correct input class type");
}
return inputObj;
}
}

@ -0,0 +1,80 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import io.grpc.Metadata;
import io.netty.handler.codec.http.HttpHeaders;
import java.util.*;
class MetadataUtil {
private static final String BINARY_HEADER_SUFFIX = "-bin";
private static final String GRPC_HEADER_PREFIX = "x-grpc-";
private static final List<String> EXCLUDED = Arrays.asList("x-grpc-web", "content-type", "grpc-accept-encoding",
"grpc-encoding");
static Metadata getHtpHeaders(HttpHeaders headers) {
Metadata httpHeaders = new Metadata();
Set<String> headerNames = headers.names();
if (headerNames == null) {
return httpHeaders;
}
// copy all headers "x-grpc-*" into Metadata
// TODO: do we need to copy all "x-*" headers instead?
for (String headerName : headerNames) {
if (EXCLUDED.contains(headerName.toLowerCase())) {
continue;
}
if (headerName.toLowerCase().startsWith(GRPC_HEADER_PREFIX)) {
// Get all the values of this header.
List<String> values = headers.getAll(headerName);
if (values != null) {
// Java enumerations have klunky API. lets convert to a list.
// this will be a short list usually.
for (String s : values) {
if (headerName.toLowerCase().endsWith(BINARY_HEADER_SUFFIX)) {
// Binary header
httpHeaders.put(Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER), s.getBytes());
} else {
// String header
httpHeaders.put(Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER), s);
}
}
}
}
}
return httpHeaders;
}
static Map<String, String> getHttpHeadersFromMetadata(Metadata trailer) {
Map<String, String> map = new HashMap<>();
for (String key : trailer.keys()) {
if (EXCLUDED.contains(key.toLowerCase())) {
continue;
}
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
// TODO allow any object type here
byte[] value = trailer.get(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
map.put(key, new String(value));
} else {
String value = trailer.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
map.put(key, value);
}
}
return map;
}
}

@ -0,0 +1,186 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import com.taobao.arthas.grpcweb.proxy.MessageUtils.ContentType;
import io.grpc.Metadata;
import io.grpc.Status;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.Base64;
import java.util.Map;
/**
* <pre>
* * https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md
* * https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
*
* grpc-web HTTP chunk grpc
*
* grpc-web http1.1 Response
* 1. headers , status 200
* 2. data chunk
* 3. trailer chunk , grpc grpc-status, grpc-message
*
* </pre>
*
* @author hengyunabc 2023-09-06
*
*/
class SendGrpcWebResponse {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private final String contentType;
/**
* http1.1 header
*/
private boolean isHeaderSent = false;
/**
* grpc message HTTP Chunk Chunk Chunk
*/
private boolean isEndChunkSent = false;
/**
* grpc DATA trailer HTTP Chunk
*/
private boolean isTrailerSent = false;
private ChannelHandlerContext ctx;
SendGrpcWebResponse(ChannelHandlerContext ctx, FullHttpRequest req) {
HttpHeaders headers = req.headers();
contentType = headers.get(HttpHeaderNames.CONTENT_TYPE);
this.ctx = ctx;
}
synchronized void writeHeaders(Metadata headers) {
if (isHeaderSent) {
return;
}
// 发送 http1.1 开头部分的内容
DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType).set(HttpHeaderNames.TRANSFER_ENCODING,
"chunked");
CorsUtils.updateCorsHeader(response.headers());
if (headers != null) {
Map<String, String> ht = MetadataUtil.getHttpHeadersFromMetadata(headers);
for (String key : ht.keySet()) {
response.headers().set(key, ht.get(key));
}
}
logger.debug("write headers: {}", response);
ctx.writeAndFlush(response);
isHeaderSent = true;
}
synchronized void returnUnimplementedStatusCode(String className) {
writeHeaders(null);
writeTrailer(
Status.UNIMPLEMENTED.withDescription("Can not find service impl, check dep, service: " + className),
null);
}
// 发送最后的 http chunked 空块
private void writeEndChunk() {
if (isEndChunkSent) {
return;
}
LastHttpContent end = new DefaultLastHttpContent();
ctx.writeAndFlush(end);
isEndChunkSent = true;
}
synchronized void writeError(Status s) {
writeHeaders(null);
writeTrailer(s, null);
}
synchronized void writeTrailer(Status status, Metadata trailer) {
if (isTrailerSent) {
return;
}
StringBuffer sb = new StringBuffer();
if (trailer != null) {
Map<String, String> ht = MetadataUtil.getHttpHeadersFromMetadata(trailer);
for (String key : ht.keySet()) {
sb.append(String.format("%s:%s\r\n", key, ht.get(key)));
}
}
sb.append(String.format("grpc-status:%d\r\n", status.getCode().value()));
if (status.getDescription() != null && !status.getDescription().isEmpty()) {
sb.append(String.format("grpc-message:%s\r\n", status.getDescription()));
}
writeResponse(sb.toString().getBytes(), MessageFramer.Type.TRAILER);
isTrailerSent = true;
writeEndChunk();
}
synchronized void writeResponse(byte[] out) {
writeResponse(out, MessageFramer.Type.DATA);
}
private void writeResponse(byte[] out, MessageFramer.Type type) {
if (isTrailerSent) {
logger.error("grpcweb trailer sented, writeResponse can not be called, framer type: {}", type);
return;
}
try {
// PUNT multiple frames not handled
byte[] prefix = new MessageFramer().getPrefix(out, type);
ByteArrayOutputStream oStream = new ByteArrayOutputStream();
// binary encode if it is "text" content type
if (MessageUtils.getContentType(contentType) == ContentType.GRPC_WEB_TEXT) {
byte[] concated = new byte[out.length + 5];
System.arraycopy(prefix, 0, concated, 0, 5);
System.arraycopy(out, 0, concated, 5, out.length);
oStream.write(Base64.getEncoder().encode(concated));
} else {
oStream.write(prefix);
oStream.write(out);
}
byte[] byteArray = oStream.toByteArray();
InputStream dataStream = new ByteArrayInputStream(byteArray);
ChunkedStream chunkedStream = new ChunkedStream(dataStream);
SingleHttpChunkedInput httpChunkedInput = new SingleHttpChunkedInput(chunkedStream);
ctx.writeAndFlush(httpChunkedInput);
} catch (IOException e) {
logger.error("write grpcweb response error, framer type: {}", type, e);
}
}
}

@ -0,0 +1,100 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.taobao.arthas.grpcweb.proxy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedInput;
/**
* LastHttpContent LastHttpContent.EMPTY_LAST_CONTENT
*
* @see LastHttpContent
* @see LastHttpContent#EMPTY_LAST_CONTENT
*/
public class SingleHttpChunkedInput implements ChunkedInput<HttpContent> {
private final ChunkedInput<ByteBuf> input;
/**
* Creates a new instance using the specified input.
* @param input {@link ChunkedInput} containing data to write
*/
public SingleHttpChunkedInput(ChunkedInput<ByteBuf> input) {
this.input = input;
// lastHttpContent = LastHttpContent.EMPTY_LAST_CONTENT;
}
/**
* Creates a new instance using the specified input. {@code lastHttpContent} will be written as the terminating
* chunk.
* @param input {@link ChunkedInput} containing data to write
* @param lastHttpContent {@link LastHttpContent} that will be written as the terminating chunk. Use this for
* training headers.
*/
public SingleHttpChunkedInput(ChunkedInput<ByteBuf> input, LastHttpContent lastHttpContent) {
this.input = input;
// this.lastHttpContent = lastHttpContent;
}
@Override
public boolean isEndOfInput() throws Exception {
if (input.isEndOfInput()) {
// Only end of input after last HTTP chunk has been sent
return true;
} else {
return false;
}
}
@Override
public void close() throws Exception {
input.close();
}
@Deprecated
@Override
public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public HttpContent readChunk(ByteBufAllocator allocator) throws Exception {
if (input.isEndOfInput()) {
return null;
} else {
ByteBuf buf = input.readChunk(allocator);
if (buf == null) {
return null;
}
return new DefaultHttpContent(buf);
}
}
@Override
public long length() {
return input.length();
}
@Override
public long progress() {
return input.progress();
}
}

@ -0,0 +1,54 @@
package com.taobao.arthas.grpcweb.proxy.server;
import com.taobao.arthas.grpcweb.proxy.GrpcServiceConnectionManager;
import com.taobao.arthas.grpcweb.proxy.GrpcWebRequestHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class GrpcWebProxyHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private GrpcWebRequestHandler requestHandler;
private static GrpcServiceConnectionManager manager;
public GrpcWebProxyHandler(int grpcPort) {
manager = new GrpcServiceConnectionManager(grpcPort);
requestHandler = new GrpcWebRequestHandler(manager);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
logger.debug("http request: {} ", request);
send100Continue(ctx);
requestHandler.handle(ctx, request);
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE, Unpooled.EMPTY_BUFFER);
ctx.write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("grpc web proxy handler error", cause);
ctx.close();
}
}

@ -0,0 +1,73 @@
package com.taobao.arthas.grpcweb.proxy.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public final class GrpcWebProxyServer {
private static final Logger logger = LoggerFactory.getLogger(GrpcWebProxyServer.class);
private int port;
private int grpcPort;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel channel;
public GrpcWebProxyServer(int port, int grpcPort) {
this.port = port;
this.grpcPort = grpcPort;
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
}
public void start() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new GrpcWebProxyServerInitializer(grpcPort));
channel = serverBootstrap.bind(port).sync().channel();
logger.info("grpc web proxy server started, listening on " + port);
System.out.println("grpc web proxy server started, listening on " + port);
channel.closeFuture().sync();
} catch (InterruptedException e) {
logger.info("fail to start grpc web proxy server!");
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void close() {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if(workerGroup != null){
workerGroup.shutdownGracefully();
}
logger.info("success to close grpc web proxy server!");
}
public int actualPort() {
int boundPort = ((InetSocketAddress) channel.localAddress()).getPort();
return boundPort;
}
}

@ -0,0 +1,26 @@
package com.taobao.arthas.grpcweb.proxy.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
public class GrpcWebProxyServerInitializer extends ChannelInitializer<SocketChannel> {
private int grpcPort;
public GrpcWebProxyServerInitializer(int grpcPort) {
this.grpcPort = grpcPort;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new GrpcWebProxyHandler(grpcPort));
}
}

@ -0,0 +1,16 @@
package com.taobao.arthas.grpcweb.proxy.server;
import com.taobao.arthas.grpcweb.proxy.CorsUtils;
import io.netty.handler.codec.http.*;
import org.junit.Test;
public class CorsUtilsTest {
@Test
public void test(){
DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
CorsUtils.updateCorsHeader(response.headers());
System.out.println(response.headers());
}
}

@ -0,0 +1,194 @@
package com.taobao.arthas.grpcweb.proxy.server;
import grpc.gateway.testing.Echo;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import com.taobao.arthas.common.SocketUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
public class GrpcWebProxyServerTest {
private int GRPC_WEB_PROXY_PORT;
private int GRPC_PORT;
private String hostName;
private CloseableHttpClient httpClient;
@Before
public void startServer(){
GRPC_WEB_PROXY_PORT = SocketUtils.findAvailableTcpPort();
GRPC_PORT = SocketUtils.findAvailableTcpPort();
// 启动grpc服务
Thread grpcStart = new Thread(() -> {
StartGrpcTest startGrpcTest = new StartGrpcTest(GRPC_PORT);
startGrpcTest.startGrpcService();
});
grpcStart.start();
// 启动grpc-web-proxy服务
Thread grpcWebProxyStart = new Thread(() -> {
StartGrpcWebProxyTest startGrpcWebProxyTest = new StartGrpcWebProxyTest(GRPC_WEB_PROXY_PORT,GRPC_PORT);
startGrpcWebProxyTest.startGrpcWebProxy();
});
grpcWebProxyStart.start();
try {
// waiting for the server to start
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
hostName = "http://127.0.0.1:" + GRPC_WEB_PROXY_PORT;
httpClient = HttpClients.createDefault();
}
@Test
public void simpleReqTest() {
// 单个response
String url = hostName +"/grpc.gateway.testing.EchoService/Echo";
String requestStr = "hello world!!!";
Echo.EchoRequest request = Echo.EchoRequest.newBuilder().setMessage(requestStr).build();
System.out.println("request message--->" + requestStr);
byte[] requestData = request.toByteArray();
requestData = ByteArrayWithLengthExample(requestData);
// 编码请求载荷为gRPC-Web格式
String encodedPayload = Base64.getEncoder().encodeToString(requestData);
try {
String result = "";
String encoding = "utf-8";
HttpPost httpPost = getPost(url, encodedPayload, encoding);
//发送请求,并拿到结果(同步阻塞)
CloseableHttpResponse response = httpClient.execute(httpPost);
//获取返回结果
HttpEntity entity = response.getEntity();
if (entity != null) {
//按指定编码转换结果实体为String类型
result = EntityUtils.toString(entity, encoding);
}
EntityUtils.consume(entity);
//释放Http请求链接
response.close();
System.out.println("result-->" + result);
System.out.println("after decode...");
// gAAAAA9ncnBjLXN0YXR1czowDQo= 是结尾字符
int endStartIndex = result.indexOf("gAAAAA");
String data = result.substring(0,endStartIndex);
String end = result.substring(endStartIndex,result.length());
byte[] decodedData = Base64.getDecoder().decode(data);
byte[] decodedEnd = Base64.getDecoder().decode(end);
// 去掉前5个byte
decodedData = RemoveBytesExample(decodedData);
decodedEnd = RemoveBytesExample(decodedEnd);
Echo.EchoResponse echoResponse = Echo.EchoResponse.parseFrom(decodedData);
System.out.println("response message--->" + echoResponse.getMessage());
String endStr = new String(decodedEnd);
System.out.println(endStr);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void streamReqTest() {
// stream response
String url = hostName + "/grpc.gateway.testing.EchoService/ServerStreamingEcho";
String requestStr = "hello world!!!";
Echo.ServerStreamingEchoRequest request = Echo.ServerStreamingEchoRequest.newBuilder().setMessage(requestStr)
.setMessageCount(5)
.build();
byte[] requestData = request.toByteArray();
requestData = ByteArrayWithLengthExample(requestData);
// 编码请求载荷为gRPC-Web格式
String encodedPayload = Base64.getEncoder().encodeToString(requestData);
try {
String encoding = "utf-8";
HttpPost httpPost = getPost(url, encodedPayload, encoding);
//发送请求
CloseableHttpResponse response = httpClient.execute(httpPost);
//获取返回结果
HttpEntity entity = response.getEntity();
if (entity != null) {
try (InputStream inputStream = entity.getContent()) {
// 在这里使用 inputStream 流式处理响应内容
// 例如,逐行读取响应内容
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
// 处理读取的数据
String result = new String(buffer, 0, bytesRead);
System.out.println("result-->" + result);
System.out.println("after decode...");
// gAAAAA9ncnBjLXN0YXR1czowDQo= 是结尾字符
byte[] decodedData = Base64.getDecoder().decode(result);
// 去掉前5个byte
decodedData = RemoveBytesExample(decodedData);
if(result.startsWith("gAAAAA")){
String end = new String(decodedData);
System.out.println(end);
}else {
Echo.ServerStreamingEchoResponse echoResponse = Echo.ServerStreamingEchoResponse.parseFrom(decodedData);
System.out.println("response message--->" + echoResponse.getMessage());
}
}
}
}
EntityUtils.consume(entity);
//释放Http请求链接
response.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public HttpPost getPost(String url, String param, String encoding) throws IOException {
System.out.println("request param(encode)--->" + param);
//创建post方式请求对象
HttpPost httpPost = new HttpPost (url);
//设置请求参数实体
StringEntity reqParam = new StringEntity(param,encoding);
reqParam.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, "application/grpc-web-text"));
// 将请求参数放到请求对象中
httpPost.setEntity(reqParam);
//设置请求报文头信息
httpPost.setHeader("Connection","keep-alive");
httpPost.setHeader("Accept", "application/grpc-web-text");
httpPost.setHeader("Content-type", "application/grpc-web-text");//设置发送表单请求
httpPost.setHeader("X-Grpc-Web","1");
httpPost.setHeader("X-User-Agent", "grpc-web-javascript/0.1");
httpPost.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36");
return httpPost;
}
public byte[] ByteArrayWithLengthExample(byte[] data){
// 添加长度信息,用于编码过程
int length = data.length;
byte[] newData = {0,0,0,0,(byte) length};
byte[] combineArray = new byte[newData.length + data.length];
System.arraycopy(newData, 0, combineArray, 0, newData.length);
System.arraycopy(data, 0, combineArray, newData.length, data.length);
return combineArray;
}
public byte[] RemoveBytesExample(byte[] data){
// 去掉长度信息,用于解码过程
byte[] result = Arrays.copyOfRange(data, 5, data.length);
return result;
}
}

@ -0,0 +1,33 @@
package com.taobao.arthas.grpcweb.proxy.server;
import com.taobao.arthas.grpcweb.proxy.MessageDeframer;
import com.taobao.arthas.grpcweb.proxy.MessageUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.junit.Assert;
import org.junit.Test;
import java.io.InputStream;
import java.util.Arrays;
public class MessageDeframerTest {
@Test
public void testProcessInput(){
String str = "AAAAAAcKBWhlbGxv";
ByteBuf content = Unpooled.copiedBuffer(str, CharsetUtil.UTF_8);
InputStream in = new ByteBufInputStream(content);
String contentTypeStr = "application/grpc-web-text";
MessageUtils.ContentType contentType = MessageUtils.validateContentType(contentTypeStr);
MessageDeframer deframer = new MessageDeframer();
boolean result = deframer.processInput(in, contentType);
Assert.assertTrue(result);
}
}

@ -0,0 +1,33 @@
package com.taobao.arthas.grpcweb.proxy.server;
import com.taobao.arthas.grpcweb.proxy.MessageUtils;
import org.junit.Assert;
import org.junit.Test;
public class MessageUtilsTest {
@Test
public void testValidateContentType(){
String contentType1 = "application/grpc-web";
MessageUtils.ContentType result1 = MessageUtils.validateContentType(contentType1);
String contentType2 = "application/grpc-web+proto";
MessageUtils.ContentType result2 = MessageUtils.validateContentType(contentType2);
String contentType3 = "application/grpc-web-text";
MessageUtils.ContentType result3 = MessageUtils.validateContentType(contentType3);
String contentType4 = "application/grpc-web-text+proto";
MessageUtils.ContentType result4 = MessageUtils.validateContentType(contentType4);
MessageUtils.ContentType result5 = MessageUtils.ContentType.GRPC_WEB_BINARY;
try {
String contentType5 = null;
result5 = MessageUtils.validateContentType(contentType5);
}catch (IllegalArgumentException e){
result5 = null;
}
Assert.assertEquals(result1,MessageUtils.ContentType.GRPC_WEB_BINARY);
Assert.assertEquals(result2,MessageUtils.ContentType.GRPC_WEB_BINARY);
Assert.assertEquals(result3,MessageUtils.ContentType.GRPC_WEB_TEXT);
Assert.assertEquals(result4,MessageUtils.ContentType.GRPC_WEB_TEXT);
Assert.assertNull(result5);
}
}

@ -0,0 +1,32 @@
package com.taobao.arthas.grpcweb.proxy.server;
import com.taobao.arthas.grpcweb.proxy.server.grpcService.EchoImpl;
import com.taobao.arthas.grpcweb.proxy.server.grpcService.GreeterService;
import com.taobao.arthas.grpcweb.proxy.server.grpcService.HelloImpl;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class StartGrpcTest {
private int GRPC_PORT;
public StartGrpcTest(int grpcPort){
this.GRPC_PORT = grpcPort;
}
public void startGrpcService(){
try {
Server grpcServer = ServerBuilder.forPort(GRPC_PORT).addService((BindableService) new GreeterService())
.addService((BindableService) new HelloImpl()).addService(new EchoImpl()).build();
grpcServer.start();
System.out.println("started gRPC server on port # " + GRPC_PORT);
System.in.read();
} catch (IOException e) {
System.out.println("fail to start gRPC server");
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,19 @@
package com.taobao.arthas.grpcweb.proxy.server;
public class StartGrpcWebProxyTest {
private int GRPC_WEB_PROXY_PORT;
private int GRPC_PORT;
public StartGrpcWebProxyTest(int grpcWebPort, int grpcPort){
this.GRPC_WEB_PROXY_PORT = grpcWebPort;
this.GRPC_PORT = grpcPort;
}
public void startGrpcWebProxy(){
GrpcWebProxyServer grpcWebProxyServer = new GrpcWebProxyServer(GRPC_WEB_PROXY_PORT, GRPC_PORT);
grpcWebProxyServer.start();
}
}

@ -0,0 +1,81 @@
package com.taobao.arthas.grpcweb.proxy.server.grpcService;
import grpc.gateway.testing.Echo.*;
import grpc.gateway.testing.EchoServiceGrpc.EchoServiceImplBase;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class EchoImpl extends EchoServiceImplBase {
@Override
public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
String message = request.getMessage();
responseObserver.onNext(EchoResponse.newBuilder().setMessage(message).setMessageCount(1).build());
responseObserver.onCompleted();
}
@Override
public void echoAbort(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
// TODO Auto-generated method stub
responseObserver.onNext(EchoResponse.newBuilder().setMessage(request.getMessage()).build());
Metadata trailers = new Metadata();
Key<String> customKey = Key.of("custom-key", Metadata.ASCII_STRING_MARSHALLER);
// 添加自定义元数据
trailers.put(customKey, "custom-value");
responseObserver.onError(Status.ABORTED.withDescription("error desc").asException(trailers));
}
@Override
public void noOp(Empty request, StreamObserver<Empty> responseObserver) {
// TODO Auto-generated method stub
super.noOp(request, responseObserver);
}
@Override
public void serverStreamingEcho(ServerStreamingEchoRequest request,
StreamObserver<ServerStreamingEchoResponse> responseObserver) {
String message = request.getMessage();
int messageCount = request.getMessageCount();
System.err.println(message);
for (int i = 0; i < messageCount; ++i) {
responseObserver.onNext(ServerStreamingEchoResponse.newBuilder().setMessage(message).build());
}
responseObserver.onCompleted();
}
@Override
public void serverStreamingEchoAbort(ServerStreamingEchoRequest request,
StreamObserver<ServerStreamingEchoResponse> responseObserver) {
// TODO Auto-generated method stub
super.serverStreamingEchoAbort(request, responseObserver);
}
@Override
public StreamObserver<ClientStreamingEchoRequest> clientStreamingEcho(
StreamObserver<ClientStreamingEchoResponse> responseObserver) {
// TODO Auto-generated method stub
return super.clientStreamingEcho(responseObserver);
}
@Override
public StreamObserver<EchoRequest> fullDuplexEcho(StreamObserver<EchoResponse> responseObserver) {
// TODO Auto-generated method stub
return super.fullDuplexEcho(responseObserver);
}
@Override
public StreamObserver<EchoRequest> halfDuplexEcho(StreamObserver<EchoResponse> responseObserver) {
// TODO Auto-generated method stub
return super.halfDuplexEcho(responseObserver);
}
}

@ -0,0 +1,34 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taobao.arthas.grpcweb.proxy.server.grpcService;
import grpcweb.examples.greeter.GreeterGrpc;
import grpcweb.examples.greeter.GreeterOuterClass.HelloReply;
import grpcweb.examples.greeter.GreeterOuterClass.HelloRequest;
import io.grpc.stub.StreamObserver;
public class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("Greeter Service responding in sayhello() method");
// throw new RuntimeException("xxxxxx");
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

@ -0,0 +1,40 @@
package com.taobao.arthas.grpcweb.proxy.server.grpcService;
import helloworld.GreeterGrpc.GreeterImplBase;
import helloworld.Helloworld.HelloReply;
import helloworld.Helloworld.HelloRequest;
import helloworld.Helloworld.RepeatHelloRequest;
import io.grpc.stub.StreamObserver;
public class HelloImpl extends GreeterImplBase{
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// TODO Auto-generated method stub
// super.sayHello(request, responseObserver);
System.err.println("sayHello");
// throw new RuntimeException("eeee");
responseObserver.onNext(HelloReply.newBuilder().setMessage("xxxx").build());
responseObserver.onCompleted();
}
@Override
public void sayRepeatHello(RepeatHelloRequest request, StreamObserver<HelloReply> responseObserver) {
// TODO Auto-generated method stub
// super.sayRepeatHello(request, responseObserver);
System.err.println("sayRepeatHello eeee ");
// throw new RuntimeException("eeee");
responseObserver.onNext(HelloReply.newBuilder().setMessage("xxxx").build());
responseObserver.onCompleted();
}
}

@ -0,0 +1,100 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.gateway.testing;
message Empty {}
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
int32 message_count = 2;
}
// Request type for server side streaming echo.
message ServerStreamingEchoRequest {
// Message string for server streaming request.
string message = 1;
// The total number of messages to be generated before the server
// closes the stream; default is 10.
int32 message_count = 2;
// The interval (ms) between two server messages. The server implementation
// may enforce some minimum interval (e.g. 100ms) to avoid message overflow.
int32 message_interval = 3;
}
// Response type for server streaming response.
message ServerStreamingEchoResponse {
// Response message.
string message = 1;
}
// Request type for client side streaming echo.
message ClientStreamingEchoRequest {
// A special value "" indicates that there's no further messages.
string message = 1;
}
// Response type for client side streaming echo.
message ClientStreamingEchoResponse {
// Total number of client messages that have been received.
int32 message_count = 1;
}
// A simple echo service.
service EchoService {
// One request followed by one response
// The server returns the client message as-is.
rpc Echo(EchoRequest) returns (EchoResponse);
// Sends back abort status.
rpc EchoAbort(EchoRequest) returns (EchoResponse) {}
// One empty request, ZERO processing, followed by one empty response
// (minimum effort to do message serialization).
rpc NoOp(Empty) returns (Empty);
// One request followed by a sequence of responses (streamed download).
// The server will return the same client message repeatedly.
rpc ServerStreamingEcho(ServerStreamingEchoRequest)
returns (stream ServerStreamingEchoResponse);
// One request followed by a sequence of responses (streamed download).
// The server abort directly.
rpc ServerStreamingEchoAbort(ServerStreamingEchoRequest)
returns (stream ServerStreamingEchoResponse) {}
// A sequence of requests followed by one response (streamed upload).
// The server returns the total number of messages as the result.
rpc ClientStreamingEcho(stream ClientStreamingEchoRequest)
returns (ClientStreamingEchoResponse);
// A sequence of requests with each message echoed by the server immediately.
// The server returns the same client messages in order.
// E.g. this is how the speech API works.
rpc FullDuplexEcho(stream EchoRequest) returns (stream EchoResponse);
// A sequence of requests followed by a sequence of responses.
// The server buffers all the client messages and then returns the same
// client messages one by one after the client half-closes the stream.
// This is how an image recognition API may work.
rpc HalfDuplexEcho(stream EchoRequest) returns (stream EchoResponse);
}

@ -0,0 +1,45 @@
// Copyright 2020 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =======================================
//
// DO NOT EDIT
// this is copy of
// https://github.com/grpc/grpc-web/blob/master/net/grpc/gateway/
// examples/helloworld/helloworld.proto
//
// TODO: can the original be directly used without making copy here
// =======================================
syntax = "proto3";
option java_package = "grpcweb.examples.greeter";
package grpcweb.examples.greeter;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}

@ -0,0 +1,37 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package helloworld;
service Greeter {
// unary call
rpc SayHello(HelloRequest) returns (HelloReply);
// server streaming call
rpc SayRepeatHello(RepeatHelloRequest) returns (stream HelloReply);
}
message HelloRequest {
string name = 1;
}
message RepeatHelloRequest {
string name = 1;
int32 count = 2;
}
message HelloReply {
string message = 1;
}

@ -75,6 +75,7 @@
<module>testcase</module>
<module>site</module>
<module>packaging</module>
<module>arthas-grpc-web-proxy</module>
</modules>
<properties>

Loading…
Cancel
Save