From 9c33b274f98d358e5be81417dddf5e65ec310678 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 13:55:56 +0300 Subject: [PATCH 01/20] Ability to cancel BRPOP and BLPOP async command execution. #446 --- .../org/redisson/client/RedisConnection.java | 4 +- .../redisson/command/CommandAsyncService.java | 12 +++++ .../redisson/RedissonBlockingQueueTest.java | 50 ++++++++++++++++--- 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 85ff35041..53f1c9383 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -174,8 +174,8 @@ public class RedisConnection implements RedisCommands { return closed; } - public void forceReconnect() { - channel.close(); + public ChannelFuture forceReconnectAsync() { + return channel.close(); } /** diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index d8c44d2fa..f2e63379a 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -462,6 +462,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { int timeoutTime = connectionManager.getConfig().getTimeout(); if (skipTimeout.contains(details.getCommand().getName())) { Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); + details.getMainPromise().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isCancelled()) { + return; + } + // cancel handling for commands from skipTimeout collection + if (details.getAttemptPromise().cancel(true)) { + connection.forceReconnectAsync(); + } + } + }); if (popTimeout == 0) { return; } diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index d28a5d3da..5fab2294d 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -1,9 +1,6 @@ package org.redisson; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.*; import java.util.ArrayList; import java.util.HashSet; @@ -22,8 +19,47 @@ import org.junit.Assert; import org.junit.Test; import org.redisson.core.RBlockingQueue; +import io.netty.util.concurrent.Future; + public class RedissonBlockingQueueTest extends BaseTest { + @Test + public void testTakeAsyncCancel() { + Config config = createConfig(); + config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1); + + RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + for (int i = 0; i < 10; i++) { + Future f = queue1.takeAsync(); + f.cancel(true); + } + assertThat(queue1.add(1)).isTrue(); + assertThat(queue1.add(2)).isTrue(); + assertThat(queue1.size()).isEqualTo(2); + + redisson.shutdown(); + } + + @Test + public void testPollAsyncCancel() { + Config config = createConfig(); + config.useSingleServer().setConnectionMinimumIdleSize(1).setConnectionPoolSize(1); + + RedissonClient redisson = Redisson.create(config); + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + for (int i = 0; i < 10; i++) { + Future f = queue1.pollAsync(1, TimeUnit.SECONDS); + f.cancel(true); + } + assertThat(queue1.add(1)).isTrue(); + assertThat(queue1.add(2)).isTrue(); + assertThat(queue1.size()).isEqualTo(2); + + redisson.shutdown(); + } + + @Test public void testPollFromAny() throws InterruptedException { final RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); @@ -225,14 +261,14 @@ public class RedissonBlockingQueueTest extends BaseTest { try { // blocking int item = queue.take(); - assertTrue(item > 0 && item <= total); + assertThat(item > 0 && item <= total).isTrue(); } catch (InterruptedException exception) { - fail(); + Assert.fail(); } count++; } - assertThat(counter.get(), equalTo(total)); + assertThat(counter.get()).isEqualTo(total); queue.delete(); } From 73452d9974566963442c41f3dfeba3bdda8369c1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 16:23:33 +0300 Subject: [PATCH 02/20] RemoteService implemented. #434 --- src/main/java/org/redisson/Redisson.java | 9 +- .../java/org/redisson/RedissonClient.java | 8 + .../org/redisson/RedissonRemoteService.java | 152 ++++++++++++++++++ .../java/org/redisson/RemoteServiceKey.java | 68 ++++++++ .../org/redisson/RemoteServiceMethod.java | 39 +++++ .../org/redisson/RemoteServiceRequest.java | 54 +++++++ .../org/redisson/RemoteServiceResponse.java | 42 +++++ .../client/handler/CommandDecoder.java | 8 +- .../org/redisson/codec/JsonJacksonCodec.java | 15 ++ .../org/redisson/core/RRemoteService.java | 36 +++++ .../redisson/RedissonRemoteServiceTest.java | 83 ++++++++++ 11 files changed, 509 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonRemoteService.java create mode 100644 src/main/java/org/redisson/RemoteServiceKey.java create mode 100644 src/main/java/org/redisson/RemoteServiceMethod.java create mode 100644 src/main/java/org/redisson/RemoteServiceRequest.java create mode 100644 src/main/java/org/redisson/RemoteServiceResponse.java create mode 100644 src/main/java/org/redisson/core/RRemoteService.java create mode 100644 src/test/java/org/redisson/RedissonRemoteServiceTest.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 648beb275..096483e1d 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -58,10 +58,10 @@ import org.redisson.core.RListMultimapCache; import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RMapCache; -import org.redisson.core.RMultimapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; +import org.redisson.core.RRemoteService; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSemaphore; @@ -87,6 +87,7 @@ public class Redisson implements RedissonClient { private final CommandExecutor commandExecutor; private final ConnectionManager connectionManager; private final Config config; + private final RedissonRemoteService remoteService; private final UUID id = UUID.randomUUID(); @@ -114,6 +115,7 @@ public class Redisson implements RedissonClient { } commandExecutor = new CommandSyncService(connectionManager); evictionScheduler = new EvictionScheduler(commandExecutor); + remoteService = new RedissonRemoteService(this); } private void validate(SingleServerConfig config) { @@ -369,6 +371,10 @@ public class Redisson implements RedissonClient { return new RedissonScript(commandExecutor); } + public RRemoteService getRemoteSerivce() { + return remoteService; + } + @Override public RSortedSet getSortedSet(String name) { return new RedissonSortedSet(commandExecutor, name); @@ -501,6 +507,7 @@ public class Redisson implements RedissonClient { @Override public void shutdown() { + remoteService.shutdown(); connectionManager.shutdown(); } diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 270877e9f..edb10dced 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -45,6 +45,7 @@ import org.redisson.core.RMapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; +import org.redisson.core.RRemoteService; import org.redisson.core.RScoredSortedSet; import org.redisson.core.RScript; import org.redisson.core.RSemaphore; @@ -585,6 +586,13 @@ public interface RedissonClient { */ RScript getScript(); + /** + * Returns object for remote operations + * + * @return + */ + RRemoteService getRemoteSerivce(); + /** * Return batch object which executes group of * command in pipeline. diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java new file mode 100644 index 000000000..d3dac19b7 --- /dev/null +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -0,0 +1,152 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.core.MessageListener; +import org.redisson.core.RBlockingQueue; +import org.redisson.core.RRemoteService; +import org.redisson.core.RTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBufUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.ThreadLocalRandom; + +public class RedissonRemoteService implements RRemoteService { + + private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); + + private final Map beans = PlatformDependent.newConcurrentHashMap(); + private final Queue> futures = new ConcurrentLinkedQueue>(); + + private final Redisson redisson; + + public RedissonRemoteService(Redisson redisson) { + this.redisson = redisson; + } + + @Override + public void register(Class serviceInterface, T object) { + for (Method method : serviceInterface.getMethods()) { + RemoteServiceMethod value = new RemoteServiceMethod(method, object); + RemoteServiceKey key = new RemoteServiceKey(serviceInterface, method.getName()); + if (beans.put(key, value) != null) { + return; + } + } + + String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + subscribe(serviceInterface, requestQueue); + } + + private void subscribe(final Class serviceInterface, final RBlockingQueue requestQueue) { + Future take = requestQueue.takeAsync(); + futures.add(take); + take.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + RemoteServiceRequest request = future.getNow(); + RemoteServiceMethod method = beans.get(new RemoteServiceKey(serviceInterface, request.getMethodName())); + String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + request.getRequestId(); + RTopic topic = redisson.getTopic(responseName); + RemoteServiceResponse response; + try { + Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); + response = new RemoteServiceResponse(result); + } catch (Exception e) { + e.getCause().printStackTrace(); + response = new RemoteServiceResponse(e.getCause()); + log.error("Can't execute: " + method.getMethod().getName() + " with args: " + request.getArgs(), e); + } + + long clients = topic.publish(response); + if (clients == 0) { + log.error("None of clients has not received a response for request {}", request); + } + + futures.remove(future); + subscribe(serviceInterface, requestQueue); + } + }); + } + + @Override + public T get(final Class serviceInterface) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String requestId = generateRequestId(); + System.out.println(requestId); + + String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); + + String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + requestId; + final RTopic topic = redisson.getTopic(responseName); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference response = new AtomicReference(); + int listenerId = topic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, RemoteServiceResponse msg) { + response.set(msg); + latch.countDown(); + } + }); + + latch.await(); + topic.removeListener(listenerId); + RemoteServiceResponse msg = response.get(); + if (msg.getError() != null) { + throw msg.getError(); + } + return msg.getResult(); + } + }; + return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler); + } + + private String generateRequestId() { + byte[] id = new byte[16]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + ThreadLocalRandom.current().nextBytes(id); + return ByteBufUtil.hexDump(id); + } + + public void shutdown() { + for (Future future : futures) { + future.cancel(true); + } + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceKey.java b/src/main/java/org/redisson/RemoteServiceKey.java new file mode 100644 index 000000000..5cadb3a5a --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceKey.java @@ -0,0 +1,68 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +public class RemoteServiceKey { + + private final Class serviceInterface; + private final String methodName; + + public RemoteServiceKey(Class serviceInterface, String methodName) { + super(); + this.serviceInterface = serviceInterface; + this.methodName = methodName; + } + + public String getMethodName() { + return methodName; + } + + public Class getServiceInterface() { + return serviceInterface; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((methodName == null) ? 0 : methodName.hashCode()); + result = prime * result + ((serviceInterface == null) ? 0 : serviceInterface.getName().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RemoteServiceKey other = (RemoteServiceKey) obj; + if (methodName == null) { + if (other.methodName != null) + return false; + } else if (!methodName.equals(other.methodName)) + return false; + if (serviceInterface == null) { + if (other.serviceInterface != null) + return false; + } else if (!serviceInterface.equals(other.serviceInterface)) + return false; + return true; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceMethod.java b/src/main/java/org/redisson/RemoteServiceMethod.java new file mode 100644 index 000000000..153c82d19 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceMethod.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.lang.reflect.Method; + +public class RemoteServiceMethod { + + private final Object bean; + private final Method method; + + public RemoteServiceMethod(Method method, Object bean) { + super(); + this.method = method; + this.bean = bean; + } + + public Object getBean() { + return bean; + } + + public Method getMethod() { + return method; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceRequest.java b/src/main/java/org/redisson/RemoteServiceRequest.java new file mode 100644 index 000000000..3934a78b7 --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceRequest.java @@ -0,0 +1,54 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Arrays; + +public class RemoteServiceRequest { + + private String requestId; + private String methodName; + private Object[] args; + + public RemoteServiceRequest() { + } + + public RemoteServiceRequest(String requestId, String methodName, Object[] args) { + super(); + this.requestId = requestId; + this.methodName = methodName; + this.args = args; + } + + public String getRequestId() { + return requestId; + } + + public Object[] getArgs() { + return args; + } + + public String getMethodName() { + return methodName; + } + + @Override + public String toString() { + return "[requestId=" + requestId + ", methodName=" + methodName + ", args=" + + Arrays.toString(args) + "]"; + } + +} diff --git a/src/main/java/org/redisson/RemoteServiceResponse.java b/src/main/java/org/redisson/RemoteServiceResponse.java new file mode 100644 index 000000000..dd14bf06c --- /dev/null +++ b/src/main/java/org/redisson/RemoteServiceResponse.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +public class RemoteServiceResponse { + + private Object result; + private Throwable error; + + public RemoteServiceResponse() { + } + + public RemoteServiceResponse(Object result) { + this.result = result; + } + + public RemoteServiceResponse(Throwable error) { + this.error = error; + } + + public Throwable getError() { + return error; + } + + public Object getResult() { + return result; + } + +} diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index f7b150e9e..ac868aafd 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -84,12 +84,12 @@ public class CommandDecoder extends ReplayingDecoder { currentDecoder = StringCodec.INSTANCE.getValueDecoder(); } - if (log.isTraceEnabled()) { - log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); - } - if (state() == null) { state(new State()); + + if (log.isTraceEnabled()) { + log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); + } } state().setDecoderState(null); diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 711e1156f..ab518b0ed 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -23,8 +23,15 @@ import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonIdentityInfo; +import com.fasterxml.jackson.annotation.JsonIdentityReference; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.ObjectIdGenerators; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.MapperFeature; @@ -32,6 +39,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder; import io.netty.buffer.ByteBuf; @@ -49,6 +57,12 @@ public class JsonJacksonCodec implements Codec { public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); + @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id") + @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY) + public static class ThrowableMixIn { + + } + private final ObjectMapper mapObjectMapper = initObjectMapper(); protected ObjectMapper initObjectMapper() { @@ -111,6 +125,7 @@ public class JsonJacksonCodec implements Codec { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class); } @Override diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java new file mode 100644 index 000000000..f7066dc9d --- /dev/null +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public interface RRemoteService { + + /** + * Register object as remote service + * + * @param remoteInterface + * @param object + */ + void register(Class remoteInterface, T object); + + /** + * Get remote service object for remote invocations + * + * @param remoteInterface + * @return + */ + T get(Class remoteInterface); + +} diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java new file mode 100644 index 000000000..b6cede708 --- /dev/null +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -0,0 +1,83 @@ +package org.redisson; + +import org.junit.Assert; +import org.junit.Test; +import static org.assertj.core.api.Assertions.*; + +import java.io.IOException; + +public class RedissonRemoteServiceTest extends BaseTest { + + public interface RemoteInterface { + + void voidMethod(String name, Long param); + + Long resultMethod(Long value); + + void errorMethod() throws IOException; + + void errorMethodWithCause(); + + } + + public class RemoteImpl implements RemoteInterface { + + @Override + public void voidMethod(String name, Long param) { + System.out.println(name + " " + param); + } + + @Override + public Long resultMethod(Long value) { + return value*2; + } + + @Override + public void errorMethod() throws IOException { + throw new IOException("Checking error throw"); + } + + @Override + public void errorMethodWithCause() { + try { + int s = 2 / 0; + } catch (Exception e) { + throw new RuntimeException("Checking error throw", e); + } + } + + + } + + @Test + public void testInvocations() { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class); + + ri.voidMethod("someName", 100L); + assertThat(ri.resultMethod(100L)).isEqualTo(200); + + try { + ri.errorMethod(); + Assert.fail(); + } catch (IOException e) { + assertThat(e.getMessage()).isEqualTo("Checking error throw"); + } + + try { + ri.errorMethodWithCause(); + Assert.fail(); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); + assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); + } + + + r1.shutdown(); + r2.shutdown(); + } + +} From f152c23d57004296e87ef356590492dabbe4f0d1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 16:31:12 +0300 Subject: [PATCH 03/20] sysout removed --- src/main/java/org/redisson/RedissonRemoteService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index d3dac19b7..176897b90 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -106,7 +106,6 @@ public class RedissonRemoteService implements RRemoteService { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String requestId = generateRequestId(); - System.out.println(requestId); String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); From 43e632bfdf93036e1747b1436b0fedaa3a3c8a11 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 16:39:39 +0300 Subject: [PATCH 04/20] minor RemoteService improvements --- src/main/java/org/redisson/RedissonRemoteService.java | 5 ++--- src/main/java/org/redisson/RemoteServiceRequest.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 176897b90..0beebb71f 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -84,14 +84,13 @@ public class RedissonRemoteService implements RRemoteService { Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); response = new RemoteServiceResponse(result); } catch (Exception e) { - e.getCause().printStackTrace(); response = new RemoteServiceResponse(e.getCause()); - log.error("Can't execute: " + method.getMethod().getName() + " with args: " + request.getArgs(), e); + log.error("Can't execute: " + request, e); } long clients = topic.publish(response); if (clients == 0) { - log.error("None of clients has not received a response for request {}", request); + log.error("None of clients has not received a response for: {}", request); } futures.remove(future); diff --git a/src/main/java/org/redisson/RemoteServiceRequest.java b/src/main/java/org/redisson/RemoteServiceRequest.java index 3934a78b7..434784456 100644 --- a/src/main/java/org/redisson/RemoteServiceRequest.java +++ b/src/main/java/org/redisson/RemoteServiceRequest.java @@ -47,7 +47,7 @@ public class RemoteServiceRequest { @Override public String toString() { - return "[requestId=" + requestId + ", methodName=" + methodName + ", args=" + return "RemoteServiceRequest[requestId=" + requestId + ", methodName=" + methodName + ", args=" + Arrays.toString(args) + "]"; } From 58472c6cf90de1d2b210ad3f75e12290b88a4ffc Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 16:50:37 +0300 Subject: [PATCH 05/20] RRemoteService.register method with custom executors amount added. #434 --- .../org/redisson/RedissonRemoteService.java | 40 ++++++++++++------- .../org/redisson/core/RRemoteService.java | 11 ++++- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 0beebb71f..55455db33 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -51,21 +51,31 @@ public class RedissonRemoteService implements RRemoteService { } @Override - public void register(Class serviceInterface, T object) { - for (Method method : serviceInterface.getMethods()) { + public void register(Class remoteInterface, T object) { + register(remoteInterface, object, 1); + } + + @Override + public void register(Class remoteInterface, T object, int executorsAmount) { + if (executorsAmount < 1) { + throw new IllegalArgumentException("executorsAmount can't be lower than 1"); + } + for (Method method : remoteInterface.getMethods()) { RemoteServiceMethod value = new RemoteServiceMethod(method, object); - RemoteServiceKey key = new RemoteServiceKey(serviceInterface, method.getName()); + RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName()); if (beans.put(key, value) != null) { return; } } - - String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; - RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - subscribe(serviceInterface, requestQueue); + + for (int i = 0; i < executorsAmount; i++) { + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); + subscribe(remoteInterface, requestQueue); + } } - private void subscribe(final Class serviceInterface, final RBlockingQueue requestQueue) { + private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue) { Future take = requestQueue.takeAsync(); futures.add(take); take.addListener(new FutureListener() { @@ -76,8 +86,8 @@ public class RedissonRemoteService implements RRemoteService { } RemoteServiceRequest request = future.getNow(); - RemoteServiceMethod method = beans.get(new RemoteServiceKey(serviceInterface, request.getMethodName())); - String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + request.getRequestId(); + RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); + String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId(); RTopic topic = redisson.getTopic(responseName); RemoteServiceResponse response; try { @@ -94,23 +104,23 @@ public class RedissonRemoteService implements RRemoteService { } futures.remove(future); - subscribe(serviceInterface, requestQueue); + subscribe(remoteInterface, requestQueue); } }); } @Override - public T get(final Class serviceInterface) { + public T get(final Class remoteInterface) { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String requestId = generateRequestId(); - String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}"; + String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); - String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + requestId; + String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId; final RTopic topic = redisson.getTopic(responseName); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference response = new AtomicReference(); @@ -131,7 +141,7 @@ public class RedissonRemoteService implements RRemoteService { return msg.getResult(); } }; - return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler); + return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); } private String generateRequestId() { diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index f7066dc9d..c322c512b 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -18,13 +18,22 @@ package org.redisson.core; public interface RRemoteService { /** - * Register object as remote service + * Register remote service with single executor * * @param remoteInterface * @param object */ void register(Class remoteInterface, T object); + /** + * Register remote service with custom executors amount + * + * @param remoteInterface + * @param object + * @param executorsAmount + */ + void register(Class remoteInterface, T object, int executorsAmount); + /** * Get remote service object for remote invocations * From 7fc81160668fd2165b49a0bb35c4ad5ab487e4d4 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 17:02:43 +0300 Subject: [PATCH 06/20] RemoteService.get method with timeout invocation added. #434 --- .../java/org/redisson/RedissonRemoteService.java | 14 ++++++++++++-- .../java/org/redisson/core/RRemoteService.java | 13 +++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 55455db33..4126a1d96 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.redisson.core.MessageListener; @@ -110,7 +111,12 @@ public class RedissonRemoteService implements RRemoteService { } @Override - public T get(final Class remoteInterface) { + public T get(Class remoteInterface) { + return get(remoteInterface, -1, null); + } + + @Override + public T get(final Class remoteInterface, final int timeout, final TimeUnit timeUnit) { InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { @@ -132,7 +138,11 @@ public class RedissonRemoteService implements RRemoteService { } }); - latch.await(); + if (timeout == -1) { + latch.await(); + } else { + latch.await(timeout, timeUnit); + } topic.removeListener(listenerId); RemoteServiceResponse msg = response.get(); if (msg.getError() != null) { diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index c322c512b..f8f27bd1a 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -15,6 +15,8 @@ */ package org.redisson.core; +import java.util.concurrent.TimeUnit; + public interface RRemoteService { /** @@ -42,4 +44,15 @@ public interface RRemoteService { */ T get(Class remoteInterface); + /** + * Get remote service object for remote invocations + * with specified timeout invocation + * + * @param remoteInterface + * @param timeout - timeout invocation + * @param timeUnit + * @return + */ + T get(Class remoteInterface, int timeout, TimeUnit timeUnit); + } From 7884c07b095b8b4bfee69444615239ceeaf6b22a Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 17:12:41 +0300 Subject: [PATCH 07/20] comment fixed --- src/main/java/org/redisson/core/RRemoteService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/redisson/core/RRemoteService.java b/src/main/java/org/redisson/core/RRemoteService.java index f8f27bd1a..1b75c5dc3 100644 --- a/src/main/java/org/redisson/core/RRemoteService.java +++ b/src/main/java/org/redisson/core/RRemoteService.java @@ -46,10 +46,10 @@ public interface RRemoteService { /** * Get remote service object for remote invocations - * with specified timeout invocation + * with specified invocation timeout * * @param remoteInterface - * @param timeout - timeout invocation + * @param timeout - invocation timeout * @param timeUnit * @return */ From 5e2cbe6a2fbccf1f1b918d49168347d8ced699c6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Mar 2016 18:27:13 +0300 Subject: [PATCH 08/20] RemoteSerivce shutdown process optimization. #446 --- src/main/java/org/redisson/Redisson.java | 5 +-- .../org/redisson/RedissonRemoteService.java | 11 ------ .../redisson/command/CommandAsyncService.java | 36 ++++++++++++------- .../connection/ConnectionManager.java | 2 ++ .../MasterSlaveConnectionManager.java | 9 +++++ 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 096483e1d..f1fdf2953 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -87,7 +87,6 @@ public class Redisson implements RedissonClient { private final CommandExecutor commandExecutor; private final ConnectionManager connectionManager; private final Config config; - private final RedissonRemoteService remoteService; private final UUID id = UUID.randomUUID(); @@ -115,7 +114,6 @@ public class Redisson implements RedissonClient { } commandExecutor = new CommandSyncService(connectionManager); evictionScheduler = new EvictionScheduler(commandExecutor); - remoteService = new RedissonRemoteService(this); } private void validate(SingleServerConfig config) { @@ -372,7 +370,7 @@ public class Redisson implements RedissonClient { } public RRemoteService getRemoteSerivce() { - return remoteService; + return new RedissonRemoteService(this); } @Override @@ -507,7 +505,6 @@ public class Redisson implements RedissonClient { @Override public void shutdown() { - remoteService.shutdown(); connectionManager.shutdown(); } diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 4126a1d96..09a9c8d18 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -19,8 +19,6 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -43,7 +41,6 @@ public class RedissonRemoteService implements RRemoteService { private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); private final Map beans = PlatformDependent.newConcurrentHashMap(); - private final Queue> futures = new ConcurrentLinkedQueue>(); private final Redisson redisson; @@ -78,7 +75,6 @@ public class RedissonRemoteService implements RRemoteService { private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue) { Future take = requestQueue.takeAsync(); - futures.add(take); take.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -104,7 +100,6 @@ public class RedissonRemoteService implements RRemoteService { log.error("None of clients has not received a response for: {}", request); } - futures.remove(future); subscribe(remoteInterface, requestQueue); } }); @@ -161,10 +156,4 @@ public class RedissonRemoteService implements RRemoteService { return ByteBufUtil.hexDump(id); } - public void shutdown() { - for (Future future : futures) { - future.cancel(true); - } - } - } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index f2e63379a..d7f0c6126 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -462,18 +462,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { int timeoutTime = connectionManager.getConfig().getTimeout(); if (skipTimeout.contains(details.getCommand().getName())) { Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); - details.getMainPromise().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isCancelled()) { - return; - } - // cancel handling for commands from skipTimeout collection - if (details.getAttemptPromise().cancel(true)) { - connection.forceReconnectAsync(); - } - } - }); + handleBlockingOperations(details, connection); if (popTimeout == 0) { return; } @@ -494,6 +483,29 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setTimeout(timeout); } + private void handleBlockingOperations(final AsyncDetails details, final RedisConnection connection) { + final FutureListener listener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + details.getMainPromise().cancel(true); + } + }; + details.getMainPromise().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isCancelled()) { + connectionManager.getShutdownPromise().removeListener(listener); + return; + } + // cancel handling for commands from skipTimeout collection + if (details.getAttemptPromise().cancel(true)) { + connection.forceReconnectAsync(); + } + } + }); + connectionManager.getShutdownPromise().addListener(listener); + } + private void checkConnectionFuture(final NodeSource source, final AsyncDetails details) { if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) { diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 48b2e2c98..21a275ddb 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -107,5 +107,7 @@ public interface ConnectionManager { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); InfinitySemaphoreLatch getShutdownLatch(); + + Future getShutdownPromise(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1fb69f386..065e66f75 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -122,6 +122,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final Map entries = PlatformDependent.newConcurrentHashMap(); + private final Promise shutdownPromise; + private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); private final Set clients = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); @@ -156,6 +158,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.socketChannelClass = NioSocketChannel.class; } this.codec = cfg.getCodec(); + this.shutdownPromise = group.next().newPromise(); this.isClusterMode = cfg.isClusterConfig(); } @@ -674,6 +677,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { + shutdownPromise.trySuccess(true); shutdownLatch.closeAndAwaitUninterruptibly(); for (MasterSlaveEntry entry : entries.values()) { entry.shutdown(); @@ -731,6 +735,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public InfinitySemaphoreLatch getShutdownLatch() { return shutdownLatch; } + + @Override + public Future getShutdownPromise() { + return shutdownPromise; + } @Override public ConnectionEventsHub getConnectionEventsHub() { From ac3278d94b87a1a71c0aa13b0d7bc7bcf8f7e438 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 10:10:56 +0300 Subject: [PATCH 09/20] RemoteService timeout handling fixed. #434 --- .../org/redisson/RedissonRemoteService.java | 12 ++++++--- .../org/redisson/RemoteServiceResponse.java | 5 ++++ .../redisson/command/CommandAsyncService.java | 2 +- .../redisson/RedissonRemoteServiceTest.java | 27 ++++++++++++++++++- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 09a9c8d18..b80124260 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.client.RedisException; +import org.redisson.client.RedisTimeoutException; import org.redisson.core.MessageListener; import org.redisson.core.RBlockingQueue; import org.redisson.core.RRemoteService; @@ -97,7 +99,7 @@ public class RedissonRemoteService implements RRemoteService { long clients = topic.publish(response); if (clients == 0) { - log.error("None of clients has not received a response for: {}", request); + log.error("None of clients has not received a response: {} for request: {}", response, request); } subscribe(remoteInterface, requestQueue); @@ -119,7 +121,8 @@ public class RedissonRemoteService implements RRemoteService { String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}"; RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName); - requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args)); + RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args); + requestQueue.add(request); String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId; final RTopic topic = redisson.getTopic(responseName); @@ -136,7 +139,10 @@ public class RedissonRemoteService implements RRemoteService { if (timeout == -1) { latch.await(); } else { - latch.await(timeout, timeUnit); + if (!latch.await(timeout, timeUnit)) { + topic.removeListener(listenerId); + throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request); + } } topic.removeListener(listenerId); RemoteServiceResponse msg = response.get(); diff --git a/src/main/java/org/redisson/RemoteServiceResponse.java b/src/main/java/org/redisson/RemoteServiceResponse.java index dd14bf06c..09ebc5999 100644 --- a/src/main/java/org/redisson/RemoteServiceResponse.java +++ b/src/main/java/org/redisson/RemoteServiceResponse.java @@ -38,5 +38,10 @@ public class RemoteServiceResponse { public Object getResult() { return result; } + + @Override + public String toString() { + return "RemoteServiceResponse [result=" + result + ", error=" + error + "]"; + } } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index d7f0c6126..faba6d1fc 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -493,8 +493,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getMainPromise().addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + connectionManager.getShutdownPromise().removeListener(listener); if (!future.isCancelled()) { - connectionManager.getShutdownPromise().removeListener(listener); return; } // cancel handling for commands from skipTimeout collection diff --git a/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/src/test/java/org/redisson/RedissonRemoteServiceTest.java index b6cede708..c8daedd90 100644 --- a/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -2,9 +2,12 @@ package org.redisson; import org.junit.Assert; import org.junit.Test; +import org.redisson.client.RedisTimeoutException; + import static org.assertj.core.api.Assertions.*; import java.io.IOException; +import java.util.concurrent.TimeUnit; public class RedissonRemoteServiceTest extends BaseTest { @@ -18,6 +21,8 @@ public class RedissonRemoteServiceTest extends BaseTest { void errorMethodWithCause(); + void timeoutMethod() throws InterruptedException; + } public class RemoteImpl implements RemoteInterface { @@ -46,8 +51,29 @@ public class RedissonRemoteServiceTest extends BaseTest { } } + @Override + public void timeoutMethod() throws InterruptedException { + Thread.sleep(2000); + } + } + + @Test(expected = RedisTimeoutException.class) + public void testTimeout() throws InterruptedException { + RedissonClient r1 = Redisson.create(); + r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl()); + + RedissonClient r2 = Redisson.create(); + RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 1, TimeUnit.SECONDS); + + try { + ri.timeoutMethod(); + } finally { + r1.shutdown(); + r2.shutdown(); + } + } @Test public void testInvocations() { @@ -74,7 +100,6 @@ public class RedissonRemoteServiceTest extends BaseTest { assertThat(e.getCause()).isInstanceOf(ArithmeticException.class); assertThat(e.getCause().getMessage()).isEqualTo("/ by zero"); } - r1.shutdown(); r2.shutdown(); From 40d5ad5237cb9d5200248852cfcd05b0dad943dd Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 10:43:52 +0300 Subject: [PATCH 10/20] Delete and expire Multimap methods fix. #447 --- .../java/org/redisson/RedissonMultimap.java | 71 +++++++++++++++++++ .../org/redisson/RedissonSetMultimapTest.java | 55 +++++++++++++- 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/redisson/RedissonMultimap.java b/src/main/java/org/redisson/RedissonMultimap.java index 475660b82..53f4b1944 100644 --- a/src/main/java/org/redisson/RedissonMultimap.java +++ b/src/main/java/org/redisson/RedissonMultimap.java @@ -20,15 +20,18 @@ import java.net.InetSocketAddress; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.Set; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -152,10 +155,12 @@ public abstract class RedissonMultimap extends RedissonExpirable implement return new EntrySet(); } + @Override public long fastRemove(K ... keys) { return get(fastRemoveAsync(keys)); } + @Override public Future fastRemoveAsync(K ... keys) { if (keys == null || keys.length == 0) { return newSucceededFuture(0L); @@ -184,6 +189,72 @@ public abstract class RedissonMultimap extends RedissonExpirable implement throw new RuntimeException(e); } } + + @Override + public Future deleteAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {KEYS[1]}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "table.insert(keys, name); " + + "end;" + + "end; " + + + "local n = 0 " + + "for i=1, #keys,5000 do " + + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " + + "end; " + + "return n;", + Arrays.asList(getName())); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpire', name, ARGV[1]); " + + "end;" + + "end; " + + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", + Arrays.asList(getName()), timeUnit.toMillis(timeToLive)); + } + + @Override + public Future expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpireat', name, ARGV[1]); " + + "end;" + + "end; " + + "return redis.call('pexpireat', KEYS[1], ARGV[1]); ", + Arrays.asList(getName()), timestamp); + } + + @Override + public Future clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('persist', name); " + + "end;" + + "end; " + + "return redis.call('persist', KEYS[1]); ", + Arrays.asList(getName())); + } + MapScanResult scanIterator(InetSocketAddress client, long startPos) { Future> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); diff --git a/src/test/java/org/redisson/RedissonSetMultimapTest.java b/src/test/java/org/redisson/RedissonSetMultimapTest.java index ef47fd980..c30256c53 100644 --- a/src/test/java/org/redisson/RedissonSetMultimapTest.java +++ b/src/test/java/org/redisson/RedissonSetMultimapTest.java @@ -1,15 +1,17 @@ package org.redisson; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.redisson.core.RSetMultimap; -import static org.assertj.core.api.Assertions.*; public class RedissonSetMultimapTest extends BaseTest { @@ -273,5 +275,56 @@ public class RedissonSetMultimapTest extends BaseTest { assertThat(allValues).containsOnlyElementsOf(values); } + @Test + public void testExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expire(100, TimeUnit.MILLISECONDS); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testExpireAt() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testClearExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + map.clearExpire(); + + Thread.sleep(500); + + assertThat(map.size()).isEqualTo(2); + } + + @Test + public void testDelete() { + RSetMultimap map = redisson.getSetMultimap("simple"); + map.put("1", "2"); + map.put("2", "3"); + assertThat(map.delete()).isTrue(); + + RSetMultimap map2 = redisson.getSetMultimap("simple1"); + assertThat(map2.delete()).isFalse(); + } } From dead46a732846e7f45209f5496e03025c1d7c63d Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 10:45:31 +0300 Subject: [PATCH 11/20] minor changes --- src/main/java/org/redisson/RedissonMultimap.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/org/redisson/RedissonMultimap.java b/src/main/java/org/redisson/RedissonMultimap.java index 53f4b1944..a072520a5 100644 --- a/src/main/java/org/redisson/RedissonMultimap.java +++ b/src/main/java/org/redisson/RedissonMultimap.java @@ -214,7 +214,6 @@ public abstract class RedissonMultimap extends RedissonExpirable implement public Future expireAsync(long timeToLive, TimeUnit timeUnit) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local entries = redis.call('hgetall', KEYS[1]); " + - "local keys = {}; " + "for i, v in ipairs(entries) do " + "if i % 2 == 0 then " + "local name = '{' .. KEYS[1] .. '}:' .. v; " + @@ -229,7 +228,6 @@ public abstract class RedissonMultimap extends RedissonExpirable implement public Future expireAtAsync(long timestamp) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local entries = redis.call('hgetall', KEYS[1]); " + - "local keys = {}; " + "for i, v in ipairs(entries) do " + "if i % 2 == 0 then " + "local name = '{' .. KEYS[1] .. '}:' .. v; " + @@ -244,7 +242,6 @@ public abstract class RedissonMultimap extends RedissonExpirable implement public Future clearExpireAsync() { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local entries = redis.call('hgetall', KEYS[1]); " + - "local keys = {}; " + "for i, v in ipairs(entries) do " + "if i % 2 == 0 then " + "local name = '{' .. KEYS[1] .. '}:' .. v; " + From 29c30a307e18f86b1bcbc75da80b94ebf618e8e9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 10:59:57 +0300 Subject: [PATCH 12/20] Delete method fixed --- src/main/java/org/redisson/RedissonBloomFilter.java | 2 +- src/main/java/org/redisson/RedissonMapCache.java | 2 +- src/main/java/org/redisson/RedissonSetCache.java | 2 +- .../java/org/redisson/client/protocol/RedisCommands.java | 1 + .../org/redisson/reactive/RedissonMapCacheReactive.java | 2 +- .../org/redisson/reactive/RedissonSetCacheReactive.java | 2 +- src/test/java/org/redisson/RedissonSetCacheTest.java | 9 +++++++++ 7 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/redisson/RedissonBloomFilter.java b/src/main/java/org/redisson/RedissonBloomFilter.java index 6b6913039..06fb7b6db 100644 --- a/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/src/main/java/org/redisson/RedissonBloomFilter.java @@ -200,7 +200,7 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getConfigName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getConfigName()); } private void readConfig() { diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index 8900fd119..d529c2b03 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -655,7 +655,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java index 8a9083c75..97f8526a7 100644 --- a/src/main/java/org/redisson/RedissonSetCache.java +++ b/src/main/java/org/redisson/RedissonSetCache.java @@ -474,7 +474,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 7d62ca62e..9528685ce 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -208,6 +208,7 @@ public interface RedisCommands { RedisStrictCommand DEL = new RedisStrictCommand("DEL"); RedisStrictCommand DBSIZE = new RedisStrictCommand("DBSIZE"); RedisStrictCommand DEL_BOOL = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); + RedisStrictCommand DEL_OBJECTS = new RedisStrictCommand("DEL", new BooleanAmountReplayConvertor()); RedisStrictCommand DEL_VOID = new RedisStrictCommand("DEL", new VoidReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); diff --git a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 183f9e460..85acad239 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -361,7 +361,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 3bb427930..bce238a28 100644 --- a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -305,7 +305,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override diff --git a/src/test/java/org/redisson/RedissonSetCacheTest.java b/src/test/java/org/redisson/RedissonSetCacheTest.java index a864425fd..9d7c71188 100644 --- a/src/test/java/org/redisson/RedissonSetCacheTest.java +++ b/src/test/java/org/redisson/RedissonSetCacheTest.java @@ -31,6 +31,15 @@ public class RedissonSetCacheTest extends BaseTest { } } + + @Test + public void testDelete() { + RSetCache set = redisson.getSetCache("set"); + assertThat(set.delete()).isFalse(); + set.add(1, 1, TimeUnit.SECONDS); + assertThat(set.delete()).isTrue(); + assertThat(set.delete()).isFalse(); + } @Test public void testEmptyReadAll() { From 928898dc2d8418a6b3d53437c71daa73035301e3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 11:00:06 +0300 Subject: [PATCH 13/20] RedissonKeysTest refactoring --- src/test/java/org/redisson/RedissonKeysTest.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/test/java/org/redisson/RedissonKeysTest.java b/src/test/java/org/redisson/RedissonKeysTest.java index 26f22b751..4380d26df 100644 --- a/src/test/java/org/redisson/RedissonKeysTest.java +++ b/src/test/java/org/redisson/RedissonKeysTest.java @@ -1,14 +1,12 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; import org.redisson.core.RBucket; @@ -26,7 +24,7 @@ public class RedissonKeysTest extends BaseTest { Iterator iterator = redisson.getKeys().getKeysByPattern("test?").iterator(); for (; iterator.hasNext();) { String key = iterator.next(); - MatcherAssert.assertThat(key, Matchers.isOneOf("test1", "test2")); + assertThat(key).isIn("test1", "test2"); } } @@ -57,7 +55,7 @@ public class RedissonKeysTest extends BaseTest { RBucket bucket2 = redisson.getBucket("test2"); bucket2.set("someValue2"); - MatcherAssert.assertThat(redisson.getKeys().randomKey(), Matchers.isOneOf("test1", "test2")); + assertThat(redisson.getKeys().randomKey()).isIn("test1", "test2"); redisson.getKeys().delete("test1"); Assert.assertEquals(redisson.getKeys().randomKey(), "test2"); redisson.flushdb(); @@ -95,10 +93,10 @@ public class RedissonKeysTest extends BaseTest { map.fastPut("1", "2"); Collection keys = redisson.getKeys().findKeysByPattern("test?"); - MatcherAssert.assertThat(keys, Matchers.containsInAnyOrder("test1", "test2")); + assertThat(keys).containsOnly("test1", "test2"); Collection keys2 = redisson.getKeys().findKeysByPattern("test"); - MatcherAssert.assertThat(keys2, Matchers.empty()); + assertThat(keys2).isEmpty(); } @Test From 36487763c040f4581a3c3e4596ecf8732e9d57df Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 11:49:14 +0300 Subject: [PATCH 14/20] Delete and expire MultimapCache methods fixed. #447 --- .../redisson/RedissonListMultimapCache.java | 42 ++++--- .../org/redisson/RedissonMultimapCache.java | 114 ++++++++++++++++++ .../redisson/RedissonSetMultimapCache.java | 42 ++++--- .../RedissonSetMultimapCacheTest.java | 57 ++++++++- 4 files changed, 215 insertions(+), 40 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonMultimapCache.java diff --git a/src/main/java/org/redisson/RedissonListMultimapCache.java b/src/main/java/org/redisson/RedissonListMultimapCache.java index 62a4800ec..2dfce5dd0 100644 --- a/src/main/java/org/redisson/RedissonListMultimapCache.java +++ b/src/main/java/org/redisson/RedissonListMultimapCache.java @@ -22,10 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RListMultimapCache; @@ -39,16 +36,18 @@ import io.netty.util.concurrent.Future; */ public class RedissonListMultimapCache extends RedissonListMultimap implements RListMultimapCache { - private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + private final RedissonMultimapCache baseCache; RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); } RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); } public Future containsKeyAsync(Object key) { @@ -207,20 +206,27 @@ public class RedissonListMultimapCache extends RedissonListMultimap @Override public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { - long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); - - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY, - "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " - + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + - "else " + - "redis.call('zrem', KEYS[2], ARGV[2]); " - + "end; " - + "return 1; " - + "else " - + "return 0; " - + "end", - Arrays.asList(getName(), getTimeoutSetName()), ttlTimeout, key); + return baseCache.expireKeyAsync(key, timeToLive, timeUnit); + } + + @Override + public Future deleteAsync() { + return baseCache.deleteAsync(); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return baseCache.expireAsync(timeToLive, timeUnit); + } + + @Override + public Future expireAtAsync(long timestamp) { + return baseCache.expireAtAsync(timestamp); + } + + @Override + public Future clearExpireAsync() { + return baseCache.clearExpireAsync(); } } diff --git a/src/main/java/org/redisson/RedissonMultimapCache.java b/src/main/java/org/redisson/RedissonMultimapCache.java new file mode 100644 index 000000000..a33ce15f5 --- /dev/null +++ b/src/main/java/org/redisson/RedissonMultimapCache.java @@ -0,0 +1,114 @@ +package org.redisson; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.command.CommandAsyncExecutor; + +import io.netty.util.concurrent.Future; + +public class RedissonMultimapCache { + + private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + + private final CommandAsyncExecutor commandExecutor; + private final String name; + private final Codec codec; + private final String timeoutSetName; + + public RedissonMultimapCache(CommandAsyncExecutor commandExecutor, String name, Codec codec, String timeoutSetName) { + this.commandExecutor = commandExecutor; + this.name = name; + this.codec = codec; + this.timeoutSetName = timeoutSetName; + } + + public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { + long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); + + return commandExecutor.evalWriteAsync(name, codec, EVAL_EXPIRE_KEY, + "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + + "if tonumber(ARGV[1]) > 0 then " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + + "else " + + "redis.call('zrem', KEYS[2], ARGV[2]); " + + "end; " + + "return 1; " + + "else " + + "return 0; " + + "end", + Arrays.asList(name, timeoutSetName), ttlTimeout, key); + } + + public Future deleteAsync() { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local keys = {KEYS[1], KEYS[2]}; " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "table.insert(keys, name); " + + "end;" + + "end; " + + + "local n = 0 " + + "for i=1, #keys,5000 do " + + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) " + + "end; " + + "return n;", + Arrays.asList(name, timeoutSetName)); + } + + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag'); " + + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpire', name, ARGV[1]); " + + "end;" + + "end; " + + "redis.call('pexpire', KEYS[2], ARGV[1]); " + + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", + Arrays.asList(name, timeoutSetName), timeUnit.toMillis(timeToLive)); + } + + public Future expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('pexpireat', name, ARGV[1]); " + + "end;" + + "end; " + + "redis.call('pexpireat', KEYS[2], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[1], ARGV[1]); ", + Arrays.asList(name, timeoutSetName), timestamp); + } + + public Future clearExpireAsync() { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zrem', KEYS[2], 'redisson__expiretag'); " + + "local entries = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(entries) do " + + "if i % 2 == 0 then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "redis.call('persist', name); " + + "end;" + + "end; " + + "redis.call('persist', KEYS[2]); " + + "return redis.call('persist', KEYS[1]); ", + Arrays.asList(name, timeoutSetName)); + } + + +} diff --git a/src/main/java/org/redisson/RedissonSetMultimapCache.java b/src/main/java/org/redisson/RedissonSetMultimapCache.java index 9b7dee226..ea4b4192a 100644 --- a/src/main/java/org/redisson/RedissonSetMultimapCache.java +++ b/src/main/java/org/redisson/RedissonSetMultimapCache.java @@ -22,10 +22,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RSetMultimapCache; @@ -39,16 +36,18 @@ import io.netty.util.concurrent.Future; */ public class RedissonSetMultimapCache extends RedissonSetMultimap implements RSetMultimapCache { - private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + private final RedissonMultimapCache baseCache; RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); } RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); + baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); } public Future containsKeyAsync(Object key) { @@ -199,20 +198,27 @@ public class RedissonSetMultimapCache extends RedissonSetMultimap im @Override public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { - long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); - - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY, - "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " - + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + - "else " + - "redis.call('zrem', KEYS[2], ARGV[2]); " - + "end; " - + "return 1; " - + "else " - + "return 0; " - + "end", - Arrays.asList(getName(), getTimeoutSetName()), ttlTimeout, key); + return baseCache.expireKeyAsync(key, timeToLive, timeUnit); + } + + @Override + public Future deleteAsync() { + return baseCache.deleteAsync(); + } + + @Override + public Future expireAsync(long timeToLive, TimeUnit timeUnit) { + return baseCache.expireAsync(timeToLive, timeUnit); + } + + @Override + public Future expireAtAsync(long timestamp) { + return baseCache.expireAtAsync(timestamp); + } + + @Override + public Future clearExpireAsync() { + return baseCache.clearExpireAsync(); } } diff --git a/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java b/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java index 7fe68f557..442855b8c 100644 --- a/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java +++ b/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java @@ -1,15 +1,13 @@ package org.redisson; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; import java.util.concurrent.TimeUnit; -import org.junit.Assert; import org.junit.Test; -import org.redisson.codec.MsgPackJacksonCodec; import org.redisson.core.RMultimapCache; -import org.redisson.core.RSetCache; +import org.redisson.core.RSetMultimap; public class RedissonSetMultimapCacheTest extends BaseTest { @@ -137,5 +135,56 @@ public class RedissonSetMultimapCacheTest extends BaseTest { } + @Test + public void testExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expire(100, TimeUnit.MILLISECONDS); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testExpireAt() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + Thread.sleep(500); + + assertThat(map.size()).isZero(); + } + + @Test + public void testClearExpire() throws InterruptedException { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + + map.expireAt(System.currentTimeMillis() + 100); + + map.clearExpire(); + + Thread.sleep(500); + + assertThat(map.size()).isEqualTo(2); + } + + @Test + public void testDelete() { + RSetMultimap map = redisson.getSetMultimapCache("simple"); + map.put("1", "2"); + map.put("2", "3"); + assertThat(map.delete()).isTrue(); + + RSetMultimap map2 = redisson.getSetMultimapCache("simple1"); + assertThat(map2.delete()).isFalse(); + } } From f66db14ff7ab4a427f6e3f51957339af2defa557 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 11:49:41 +0300 Subject: [PATCH 15/20] license added --- .../java/org/redisson/RedissonMultimapCache.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/org/redisson/RedissonMultimapCache.java b/src/main/java/org/redisson/RedissonMultimapCache.java index a33ce15f5..765ab8021 100644 --- a/src/main/java/org/redisson/RedissonMultimapCache.java +++ b/src/main/java/org/redisson/RedissonMultimapCache.java @@ -1,3 +1,18 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson; import java.util.Arrays; From cdd50516ac3fe64dad4c64a45729e13e85af70f0 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 11:50:55 +0300 Subject: [PATCH 16/20] compilation fixed --- src/main/java/org/redisson/RedissonListMultimapCache.java | 4 ++-- src/main/java/org/redisson/RedissonSetMultimapCache.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/redisson/RedissonListMultimapCache.java b/src/main/java/org/redisson/RedissonListMultimapCache.java index 2dfce5dd0..2ba95e527 100644 --- a/src/main/java/org/redisson/RedissonListMultimapCache.java +++ b/src/main/java/org/redisson/RedissonListMultimapCache.java @@ -41,13 +41,13 @@ public class RedissonListMultimapCache extends RedissonListMultimap RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); - baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); - baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } public Future containsKeyAsync(Object key) { diff --git a/src/main/java/org/redisson/RedissonSetMultimapCache.java b/src/main/java/org/redisson/RedissonSetMultimapCache.java index ea4b4192a..4254d3ed6 100644 --- a/src/main/java/org/redisson/RedissonSetMultimapCache.java +++ b/src/main/java/org/redisson/RedissonSetMultimapCache.java @@ -41,13 +41,13 @@ public class RedissonSetMultimapCache extends RedissonSetMultimap im RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); - baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); - baseCache = new RedissonMultimapCache<>(connectionManager, name, codec, getTimeoutSetName()); + baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } public Future containsKeyAsync(Object key) { From 75ba40227df09878a709f5c814fdc2d6931e3e26 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 13:03:56 +0300 Subject: [PATCH 17/20] MOVED, ASK handling in cluster mode using RBatch. #448 --- .../java/org/redisson/client/handler/CommandDecoder.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index ac868aafd..d8ce3d733 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -139,7 +139,11 @@ public class CommandDecoder extends ReplayingDecoder { cmd.getPromise().tryFailure(e); } if (!cmd.getPromise().isSuccess()) { - error = (RedisException) cmd.getPromise().cause(); + if (!(cmd.getPromise().cause() instanceof RedisMovedException + || cmd.getPromise().cause() instanceof RedisAskException + || cmd.getPromise().cause() instanceof RedisLoadingException)) { + error = (RedisException) cmd.getPromise().cause(); + } } } From fb68ca68113a4b9a91036058ff6a0d31d0e1e583 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 13:04:04 +0300 Subject: [PATCH 18/20] test fixes --- src/test/java/org/redisson/BaseTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/test/java/org/redisson/BaseTest.java b/src/test/java/org/redisson/BaseTest.java index e1190e384..48c549c92 100644 --- a/src/test/java/org/redisson/BaseTest.java +++ b/src/test/java/org/redisson/BaseTest.java @@ -1,10 +1,8 @@ package org.redisson; -import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; -import org.redisson.client.codec.StringCodec; -import org.redisson.codec.MsgPackJacksonCodec; public abstract class BaseTest { @@ -43,9 +41,9 @@ public abstract class BaseTest { return Redisson.create(config); } - @After - public void after() { - redisson.flushdb(); + @Before + public void before() { + redisson.getKeys().flushall(); } } From fa4bb81023ba2ec6c7bfce8ea3de8a01076ed631 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 13:45:16 +0300 Subject: [PATCH 19/20] [maven-release-plugin] prepare release redisson-2.2.10 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 8907f379a..f553c4252 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.2.10-SNAPSHOT + 2.2.10 bundle Redisson @@ -15,7 +15,7 @@ scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git - HEAD + redisson-2.2.10 From e909110798d0e0cd69d17a72803c6f692879ce8b Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 Mar 2016 13:45:23 +0300 Subject: [PATCH 20/20] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index f553c4252..106c3a909 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.redisson redisson - 2.2.10 + 2.2.11-SNAPSHOT bundle Redisson @@ -15,7 +15,7 @@ scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git scm:git:git@github.com:mrniko/redisson.git - redisson-2.2.10 + HEAD