From 73452d9974566963442c41f3dfeba3bdda8369c1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 16:23:33 +0300 Subject: [PATCH] RemoteService implemented. #434 --- src/main/java/org/redisson/Redisson.java | 9 +- .../java/org/redisson/RedissonClient.java | 8 + .../org/redisson/RedissonRemoteService.java | 152 ++++++++++++++++++ .../java/org/redisson/RemoteServiceKey.java | 68 ++++++++ .../org/redisson/RemoteServiceMethod.java | 39 +++++ .../org/redisson/RemoteServiceRequest.java | 54 +++++++ .../org/redisson/RemoteServiceResponse.java | 42 +++++ .../client/handler/CommandDecoder.java | 8 +- .../org/redisson/codec/JsonJacksonCodec.java | 15 ++ .../org/redisson/core/RRemoteService.java | 36 +++++ .../redisson/RedissonRemoteServiceTest.java | 83 ++++++++++ 11 files changed, 509 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonRemoteService.java create mode 100644 src/main/java/org/redisson/RemoteServiceKey.java create mode 100644 src/main/java/org/redisson/RemoteServiceMethod.java create mode 100644 src/main/java/org/redisson/RemoteServiceRequest.java create mode 100644 src/main/java/org/redisson/RemoteServiceResponse.java create mode 100644 src/main/java/org/redisson/core/RRemoteService.java create mode 100644 src/test/java/org/redisson/RedissonRemoteServiceTest.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 648beb275..096483e1d 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -58,10 +58,10 @@ import org.redisson.core.RListMultimapCache; import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RMapCache; -import org.redisson.core.RMultimapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; +import org.redisson.core.RRemoteService; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSemaphore; @@ -87,6 +87,7 @@ public class Redisson implements RedissonClient { private final CommandExecutor commandExecutor; private final ConnectionManager connectionManager; private final Config config; + private final RedissonRemoteService remoteService; private final UUID id = UUID.randomUUID(); @@ -114,6 +115,7 @@ public class Redisson implements RedissonClient { } commandExecutor = new CommandSyncService(connectionManager); evictionScheduler = new EvictionScheduler(commandExecutor); + remoteService = new RedissonRemoteService(this); } private void validate(SingleServerConfig config) { @@ -369,6 +371,10 @@ public class Redisson implements RedissonClient { return new RedissonScript(commandExecutor); } + public RRemoteService getRemoteSerivce() { + return remoteService; + } + @Override public RSortedSet getSortedSet(String name) { return new RedissonSortedSet(commandExecutor, name); @@ -501,6 +507,7 @@ public class Redisson implements RedissonClient { @Override public void shutdown() { + remoteService.shutdown(); connectionManager.shutdown(); } diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 270877e9f..edb10dced 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -45,6 +45,7 @@ import org.redisson.core.RMapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; +import org.redisson.core.RRemoteService; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSemaphore; @@ -585,6 +586,13 @@ public interface RedissonClient { */ RScript getScript(); + /** + * Returns object for remote operations + * + * @return + */ + RRemoteService getRemoteSerivce(); + /** * Return batch object which executes group of * command in pipeline. diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java new file mode 100644 index 000000000..d3dac19b7 --- /dev/null +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -0,0 +1,152 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.core.MessageListener; +import org.redisson.core.RBlockingQueue; +import org.redisson.core.RRemoteService; +import org.redisson.core.RTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBufUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.ThreadLocalRandom; + +public class RedissonRemoteService implements RRemoteService { + + private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); + + private final Map beans = PlatformDependent.newConcurrentHashMap(); + private final Queue> futures = new ConcurrentLinkedQueue>(); + + private final Redisson redisson; + + public RedissonRemoteService(Redisson redisson) { + this.redisson = redisson; + } + + @Override + public void register(Class serviceInterface, T object) { + for (Method method : serviceInterface.getMethods()) { + RemoteServiceMethod value = new RemoteServiceMethod(method, object); + RemoteServiceKey key = new RemoteServiceKey(serviceInterface, method.getName()); + if (beans.put(key, value) != null) { + return; + } + } + + String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + subscribe(serviceInterface, requestQueue); + } + + private void subscribe(final Class serviceInterface, final RBlockingQueue requestQueue) { + Future take = requestQueue.takeAsync(); + futures.add(take); + take.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + RemoteServiceRequest request = future.getNow(); + RemoteServiceMethod method = beans.get(new RemoteServiceKey(serviceInterface, request.getMethodName())); + String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + request.getRequestId(); + RTopic topic = redisson.getTopic(responseName); + RemoteServiceResponse response; + try { + Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); + response = new RemoteServiceResponse(result); + } catch (Exception e) { + e.getCause().printStackTrace(); + response = new RemoteServiceResponse(e.getCause()); + log.error("Can't execute: " + method.getMethod().getName() + " with args: " + request.getArgs(), e); + } + + long clients = topic.publish(response); + if (clients == 0) { + log.error("None of clients has not received a response for request {}", request); + } + + futures.remove(future); + subscribe(serviceInterface, requestQueue); + } + }); + } + + @Override + public T get(final Class serviceInterface) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String requestId = generateRequestId(); + System.out.println(requestId); + + String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); + + String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + requestId; + final RTopic topic = redisson.getTopic(responseName); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference response = new AtomicReference(); + int listenerId = topic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, RemoteServiceResponse msg) { + response.set(msg); + latch.countDown(); + } + }); + + latch.await(); + topic.removeListener(listenerId); + RemoteServiceResponse msg = response.get(); + if (msg.getError() != null) { + throw msg.getError(); + } + return msg.getResult(); + } + }; + return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler); + } + + private String generateRequestId() { + byte[] id = new byte[16]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + ThreadLocalRandom.current().nextBytes(id); + return ByteBufUtil.hexDump(id); + } + + public void shutdown() { + for (Future future : futures) { + future.cancel(true); + } + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceKey.java b/src/main/java/org/redisson/RemoteServiceKey.java new file mode 100644 index 000000000..5cadb3a5a --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceKey.java @@ -0,0 +1,68 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +public class RemoteServiceKey { + + private final Class serviceInterface; + private final String methodName; + + public RemoteServiceKey(Class serviceInterface, String methodName) { + super(); + this.serviceInterface = serviceInterface; + this.methodName = methodName; + } + + public String getMethodName() { + return methodName; + } + + public Class getServiceInterface() { + return serviceInterface; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((methodName == null) ? 0 : methodName.hashCode()); + result = prime * result + ((serviceInterface == null) ? 0 : serviceInterface.getName().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RemoteServiceKey other = (RemoteServiceKey) obj; + if (methodName == null) { + if (other.methodName != null) + return false; + } else if (!methodName.equals(other.methodName)) + return false; + if (serviceInterface == null) { + if (other.serviceInterface != null) + return false; + } else if (!serviceInterface.equals(other.serviceInterface)) + return false; + return true; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceMethod.java b/src/main/java/org/redisson/RemoteServiceMethod.java new file mode 100644 index 000000000..153c82d19 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceMethod.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.lang.reflect.Method; + +public class RemoteServiceMethod { + + private final Object bean; + private final Method method; + + public RemoteServiceMethod(Method method, Object bean) { + super(); + this.method = method; + this.bean = bean; + } + + public Object getBean() { + return bean; + } + + public Method getMethod() { + return method; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceRequest.java b/src/main/java/org/redisson/RemoteServiceRequest.java new file mode 100644 index 000000000..3934a78b7 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceRequest.java @@ -0,0 +1,54 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; + +public class RemoteServiceRequest { + + private String requestId; + private String methodName; + private Object[] args; + + public RemoteServiceRequest() { + } + + public RemoteServiceRequest(String requestId, String methodName, Object[] args) { + super(); + this.requestId = requestId; + this.methodName = methodName; + this.args = args; + } + + public String getRequestId() { + return requestId; + } + + public Object[] getArgs() { + return args; + } + + public String getMethodName() { + return methodName; + } + + @Override + public String toString() { + return "[requestId=" + requestId + ", methodName=" + methodName + ", args=" + + Arrays.toString(args) + "]"; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceResponse.java b/src/main/java/org/redisson/RemoteServiceResponse.java new file mode 100644 index 000000000..dd14bf06c --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceResponse.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +public class RemoteServiceResponse { + + private Object result; + private Throwable error; + + public RemoteServiceResponse() { + } + + public RemoteServiceResponse(Object result) { + this.result = result; + } + + public RemoteServiceResponse(Throwable error) { + this.error = error; + } + + public Throwable getError() { + return error; + } + + public Object getResult() { + return result; + } + +} diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index f7b150e9e..ac868aafd 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -84,12 +84,12 @@ public class CommandDecoder extends ReplayingDecoder { currentDecoder = StringCodec.INSTANCE.getValueDecoder(); } - if (log.isTraceEnabled()) { - log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); - } - if (state() == null) { state(new State()); + + if (log.isTraceEnabled()) { + log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); + } } state().setDecoderState(null); diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 711e1156f..ab518b0ed 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -23,8 +23,15 @@ import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonIdentityInfo; +import com.fasterxml.jackson.annotation.JsonIdentityReference; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.ObjectIdGenerators; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.MapperFeature; @@ -32,6 +39,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder; import io.netty.buffer.ByteBuf; @@ -49,6 +57,12 @@ public class JsonJacksonCodec implements Codec { public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); + @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id") + @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY) + public static class ThrowableMixIn { + + } + private final ObjectMapper mapObjectMapper = initObjectMapper(); protected ObjectMapper initObjectMapper() { @@ -111,6 +125,7 @@ public class JsonJacksonCodec implements Codec { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class); } @Override diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java new file mode 100644 index 000000000..f7066dc9d --- /dev/null +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public interface RRemoteService { + + /** + * Register object as remote service + * + * @param remoteInterface + * @param object + */ + void register(Class remoteInterface, T object); + + /** + * Get remote service object for remote invocations + * + * @param remoteInterface + * @return + */ + T get(Class remoteInterface); + +} diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java new file mode 100644 index 000000000..b6cede708 --- /dev/null +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -0,0 +1,83 @@ +package org.redisson; + +import org.junit.Assert; +import org.junit.Test; +import static org.assertj.core.api.Assertions.*; + +import java.io.IOException; + +public class RedissonRemoteServiceTest extends BaseTest { + + public interface RemoteInterface { + + void voidMethod(String name, Long param); + + Long resultMethod(Long value); + + void errorMethod() throws IOException; + + void errorMethodWithCause(); + + } + + public class RemoteImpl implements RemoteInterface { + + @Override + public void voidMethod(String name, Long param) { + System.out.println(name + " " + param); + } + + @Override + public Long resultMethod(Long value) { + return value*2; + } + + @Override + public void errorMethod() throws IOException { + throw new IOException("Checking error throw"); + } + + @Override + public void errorMethodWithCause() { + try { + int s = 2 / 0; + } catch (Exception e) { + throw new RuntimeException("Checking error throw", e); + } + } + + + } + + @Test + public void testInvocations() { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class); + + ri.voidMethod("someName", 100L); + assertThat(ri.resultMethod(100L)).isEqualTo(200); + + try { + ri.errorMethod(); + Assert.fail(); + } catch (IOException e) { + assertThat(e.getMessage()).isEqualTo("Checking error throw"); + } + + try { + ri.errorMethodWithCause(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); + assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); + } + + + r1.shutdown(); + r2.shutdown(); + } + +}