diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java index c953ffc95..02d5d9af4 100644 --- a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcBootstrap.java @@ -7,7 +7,7 @@ package com.taobao.arthas.grpc.server; */ public class ArthasGrpcBootstrap { public static void main(String[] args) { - ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9090, null); + ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9091, null); arthasGrpcServer.start(); } } diff --git a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java index a931f5428..81080c73a 100644 --- a/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java +++ b/labs/arthas-grpc-server/src/main/java/com/taobao/arthas/grpc/server/ArthasGrpcServer.java @@ -30,7 +30,7 @@ public class ArthasGrpcServer { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName()); - private int port = 9090; + private int port = 9091; private String grpcServicePackageName; diff --git a/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java b/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java index 0181a9425..5af03515d 100644 --- a/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java +++ b/labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java @@ -2,22 +2,24 @@ package unittest.grpc; import arthas.grpc.unittest.ArthasUnittest; import arthas.grpc.unittest.ArthasUnittestServiceGrpc; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; import com.taobao.arthas.grpc.server.ArthasGrpcServer; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; +import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -27,50 +29,59 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class GrpcTest { private static final String HOST = "localhost"; - private static final int PORT = 9090; + private static final int PORT = 9092; private static final String HOST_PORT = HOST + ":" + PORT; private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl"; - private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null; + private static final Logger log = (Logger) LoggerFactory.getLogger(GrpcTest.class); + private ManagedChannel clientChannel; Random random = new Random(); ExecutorService threadPool = Executors.newFixedThreadPool(10); + @Before public void startServer() { + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + Logger rootLogger = loggerContext.getLogger("ROOT"); + + rootLogger.setLevel(Level.INFO); + Thread grpcWebProxyStart = new Thread(() -> { ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(PORT, UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME); arthasGrpcServer.start(); }); grpcWebProxyStart.start(); + + clientChannel = ManagedChannelBuilder.forTarget(HOST_PORT) + .usePlaintext() + .build(); } @Test public void testUnary() { - ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT) - .usePlaintext() - .build(); + log.info("testUnary start!"); - ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel); + + ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(clientChannel); try { ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("unaryInvoke").build(); ArthasUnittest.ArthasUnittestResponse res = stub.unary(request); System.out.println(res.getMessage()); } finally { - channel.shutdownNow(); + clientChannel.shutdownNow(); } + log.info("testUnary success!"); } @Test public void testUnarySum() throws InterruptedException { - ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT) - .usePlaintext() - .build(); + log.info("testUnarySum start!"); - ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel); + ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(clientChannel); for (int i = 0; i < 10; i++) { AtomicInteger sum = new AtomicInteger(0); int finalId = i; - for (int j = 0; j < 100; j++) { + for (int j = 0; j < 10; j++) { int num = random.nextInt(101); sum.addAndGet(num); threadPool.submit(() -> { @@ -82,17 +93,16 @@ public class GrpcTest { System.out.println("id:" + finalId + ",sum:" + sum.get() + ",grpcSum:" + grpcSum); Assert.assertEquals(sum.get(), grpcSum); } - channel.shutdown(); + clientChannel.shutdown(); + log.info("testUnarySum success!"); } // 用于测试客户端流 @Test public void testClientStreamSum() throws Throwable { - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) - .usePlaintext() - .build(); + log.info("testClientStreamSum start!"); - ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel); AtomicInteger sum = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(1); @@ -115,25 +125,24 @@ public class GrpcTest { } }); - for (int j = 0; j < 1000; j++) { + for (int j = 0; j < 100; j++) { int num = random.nextInt(1001); sum.addAndGet(num); clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build()); } clientStreamObserver.onCompleted(); - latch.await(); - channel.shutdown(); + latch.await(20,TimeUnit.SECONDS); + clientChannel.shutdown(); + log.info("testClientStreamSum success!"); } // 用于测试请求数据隔离性 @Test public void testDataIsolation() throws InterruptedException { - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) - .usePlaintext() - .build(); + log.info("testDataIsolation start!"); - ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel); for (int i = 0; i < 10; i++) { threadPool.submit(() -> { AtomicInteger sum = new AtomicInteger(0); @@ -170,23 +179,22 @@ public class GrpcTest { clientStreamObserver.onCompleted(); try { - latch.await(); + latch.await(20,TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } - channel.shutdown(); + clientChannel.shutdown(); }); } - Thread.sleep(7000L); + Thread.sleep(10000L); + log.info("testDataIsolation success!"); } @Test public void testServerStream() throws InterruptedException { - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) - .usePlaintext() - .build(); + log.info("testServerStream start!"); - ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel); ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("serverStream").build(); @@ -211,18 +219,17 @@ public class GrpcTest { } catch (InterruptedException e) { e.printStackTrace(); } finally { - channel.shutdown(); + clientChannel.shutdown(); } + log.info("testServerStream success!"); } // 用于测试双向流 @Test public void testBiStream() throws Throwable { - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090) - .usePlaintext() - .build(); + log.info("testBiStream start!"); - ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel); + ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel); CountDownLatch latch = new CountDownLatch(1); StreamObserver biStreamObserver = stub.biStream(new StreamObserver() { @@ -251,8 +258,9 @@ public class GrpcTest { Thread.sleep(2000); biStreamObserver.onCompleted(); - latch.await(); - channel.shutdown(); + latch.await(20, TimeUnit.SECONDS); + clientChannel.shutdown(); + log.info("testBiStream success!"); } private void addSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id, int num) {