Fix grpc unittest (#2956)

master
FengYe 2 weeks ago committed by GitHub
parent b266e1c967
commit 3307687cb4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,7 +7,7 @@ package com.taobao.arthas.grpc.server;
*/ */
public class ArthasGrpcBootstrap { public class ArthasGrpcBootstrap {
public static void main(String[] args) { public static void main(String[] args) {
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9090, null); ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9091, null);
arthasGrpcServer.start(); arthasGrpcServer.start();
} }
} }

@ -30,7 +30,7 @@ public class ArthasGrpcServer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());
private int port = 9090; private int port = 9091;
private String grpcServicePackageName; private String grpcServicePackageName;

@ -2,22 +2,24 @@ package unittest.grpc;
import arthas.grpc.unittest.ArthasUnittest; import arthas.grpc.unittest.ArthasUnittest;
import arthas.grpc.unittest.ArthasUnittestServiceGrpc; import arthas.grpc.unittest.ArthasUnittestServiceGrpc;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import com.taobao.arthas.grpc.server.ArthasGrpcServer; import com.taobao.arthas.grpc.server.ArthasGrpcServer;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.jupiter.api.BeforeAll; import org.slf4j.LoggerFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import java.lang.invoke.MethodHandles;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -27,50 +29,59 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class GrpcTest { public class GrpcTest {
private static final String HOST = "localhost"; private static final String HOST = "localhost";
private static final int PORT = 9090; private static final int PORT = 9092;
private static final String HOST_PORT = HOST + ":" + PORT; private static final String HOST_PORT = HOST + ":" + PORT;
private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl"; private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl";
private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null; private static final Logger log = (Logger) LoggerFactory.getLogger(GrpcTest.class);
private ManagedChannel clientChannel;
Random random = new Random(); Random random = new Random();
ExecutorService threadPool = Executors.newFixedThreadPool(10); ExecutorService threadPool = Executors.newFixedThreadPool(10);
@Before @Before
public void startServer() { public void startServer() {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
Logger rootLogger = loggerContext.getLogger("ROOT");
rootLogger.setLevel(Level.INFO);
Thread grpcWebProxyStart = new Thread(() -> { Thread grpcWebProxyStart = new Thread(() -> {
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(PORT, UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME); ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(PORT, UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME);
arthasGrpcServer.start(); arthasGrpcServer.start();
}); });
grpcWebProxyStart.start(); grpcWebProxyStart.start();
clientChannel = ManagedChannelBuilder.forTarget(HOST_PORT)
.usePlaintext()
.build();
} }
@Test @Test
public void testUnary() { public void testUnary() {
ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT) log.info("testUnary start!");
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel);
ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(clientChannel);
try { try {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("unaryInvoke").build(); ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("unaryInvoke").build();
ArthasUnittest.ArthasUnittestResponse res = stub.unary(request); ArthasUnittest.ArthasUnittestResponse res = stub.unary(request);
System.out.println(res.getMessage()); System.out.println(res.getMessage());
} finally { } finally {
channel.shutdownNow(); clientChannel.shutdownNow();
} }
log.info("testUnary success!");
} }
@Test @Test
public void testUnarySum() throws InterruptedException { public void testUnarySum() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT) log.info("testUnarySum start!");
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel); ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(clientChannel);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
AtomicInteger sum = new AtomicInteger(0); AtomicInteger sum = new AtomicInteger(0);
int finalId = i; int finalId = i;
for (int j = 0; j < 100; j++) { for (int j = 0; j < 10; j++) {
int num = random.nextInt(101); int num = random.nextInt(101);
sum.addAndGet(num); sum.addAndGet(num);
threadPool.submit(() -> { threadPool.submit(() -> {
@ -82,17 +93,16 @@ public class GrpcTest {
System.out.println("id:" + finalId + ",sum:" + sum.get() + ",grpcSum:" + grpcSum); System.out.println("id:" + finalId + ",sum:" + sum.get() + ",grpcSum:" + grpcSum);
Assert.assertEquals(sum.get(), grpcSum); Assert.assertEquals(sum.get(), grpcSum);
} }
channel.shutdown(); clientChannel.shutdown();
log.info("testUnarySum success!");
} }
// 用于测试客户端流 // 用于测试客户端流
@Test @Test
public void testClientStreamSum() throws Throwable { public void testClientStreamSum() throws Throwable {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) log.info("testClientStreamSum start!");
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);
AtomicInteger sum = new AtomicInteger(0); AtomicInteger sum = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -115,25 +125,24 @@ public class GrpcTest {
} }
}); });
for (int j = 0; j < 1000; j++) { for (int j = 0; j < 100; j++) {
int num = random.nextInt(1001); int num = random.nextInt(1001);
sum.addAndGet(num); sum.addAndGet(num);
clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build()); clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build());
} }
clientStreamObserver.onCompleted(); clientStreamObserver.onCompleted();
latch.await(); latch.await(20,TimeUnit.SECONDS);
channel.shutdown(); clientChannel.shutdown();
log.info("testClientStreamSum success!");
} }
// 用于测试请求数据隔离性 // 用于测试请求数据隔离性
@Test @Test
public void testDataIsolation() throws InterruptedException { public void testDataIsolation() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) log.info("testDataIsolation start!");
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
threadPool.submit(() -> { threadPool.submit(() -> {
AtomicInteger sum = new AtomicInteger(0); AtomicInteger sum = new AtomicInteger(0);
@ -170,23 +179,22 @@ public class GrpcTest {
clientStreamObserver.onCompleted(); clientStreamObserver.onCompleted();
try { try {
latch.await(); latch.await(20,TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
channel.shutdown(); clientChannel.shutdown();
}); });
} }
Thread.sleep(7000L); Thread.sleep(10000L);
log.info("testDataIsolation success!");
} }
@Test @Test
public void testServerStream() throws InterruptedException { public void testServerStream() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) log.info("testServerStream start!");
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("serverStream").build(); ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("serverStream").build();
@ -211,18 +219,17 @@ public class GrpcTest {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
channel.shutdown(); clientChannel.shutdown();
} }
log.info("testServerStream success!");
} }
// 用于测试双向流 // 用于测试双向流
@Test @Test
public void testBiStream() throws Throwable { public void testBiStream() throws Throwable {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) log.info("testBiStream start!");
.usePlaintext()
.build();
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ArthasUnittest.ArthasUnittestRequest> biStreamObserver = stub.biStream(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() { StreamObserver<ArthasUnittest.ArthasUnittestRequest> biStreamObserver = stub.biStream(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
@ -251,8 +258,9 @@ public class GrpcTest {
Thread.sleep(2000); Thread.sleep(2000);
biStreamObserver.onCompleted(); biStreamObserver.onCompleted();
latch.await(); latch.await(20, TimeUnit.SECONDS);
channel.shutdown(); clientChannel.shutdown();
log.info("testBiStream success!");
} }
private void addSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id, int num) { private void addSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id, int num) {

Loading…
Cancel
Save