From 67cebd0e0f649549f9c9d8fdd4089d64faaefe63 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 16 Jan 2018 13:06:31 +0300 Subject: [PATCH 01/13] RBatch.atomic option added. #1242 --- .../main/java/org/redisson/RedissonBatch.java | 15 +++-- .../main/java/org/redisson/api/RBatch.java | 18 ++++-- .../client/handler/CommandDecoder.java | 43 +++++++++++--- .../client/handler/CommandEncoder.java | 2 +- .../client/protocol/CommandsData.java | 18 ++++-- .../redisson/command/CommandBatchService.java | 55 +++++++++++------- .../command/CommandReactiveBatchService.java | 4 +- .../reactive/RedissonBatchReactive.java | 8 ++- .../java/org/redisson/RedissonBatchTest.java | 57 +++++++++++++++++++ 9 files changed, 173 insertions(+), 47 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index 1d4b62446..075a73171 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -67,6 +67,7 @@ public class RedissonBatch implements RBatch { private int syncSlaves; private long syncTimeout; private boolean skipResult; + private boolean atomic; protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { this.executorService = new CommandBatchService(connectionManager); @@ -241,6 +242,12 @@ public class RedissonBatch implements RBatch { return this; } + @Override + public RBatch atomic() { + this.atomic = true; + return this; + } + @Override public RBatch skipResult() { this.skipResult = true; @@ -267,22 +274,22 @@ public class RedissonBatch implements RBatch { @Override public BatchResult execute() { - return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); + return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic); } @Override public void executeSkipResult() { - executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval); + executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval, atomic); } @Override public RFuture executeSkipResultAsync() { - return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval); + return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval, atomic); } @Override public RFuture> executeAsync() { - return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); + return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RBatch.java b/redisson/src/main/java/org/redisson/api/RBatch.java index 3671fe8a9..a816135c9 100644 --- a/redisson/src/main/java/org/redisson/api/RBatch.java +++ b/redisson/src/main/java/org/redisson/api/RBatch.java @@ -23,11 +23,9 @@ import org.redisson.client.codec.Codec; /** * Interface for using pipeline feature. *

- * All method invocations on objects - * from this interface are batched to separate queue and could be executed later + * All method invocations on objects got through this interface + * are batched to separate queue and could be executed later * with execute() or executeAsync() methods. - *

- * Please be aware, atomicity is not guaranteed. * * * @author Nikita Koksharov @@ -410,9 +408,19 @@ public interface RBatch { @Deprecated RFuture executeSkipResultAsync(); + /** + * Atomically executes all batched commands as a single command. + *

+ * Please note, that in cluster mode all objects should be on the same cluster slot. + * https://github.com/antirez/redis/issues/3682 + * + * @return + */ + RBatch atomic(); + /** * Inform Redis not to send reply for this batch. - * Such approach saves response bandwidth. + * Such approach saves network traffic. *

* NOTE: Redis 3.2+ required * diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 2074089db..71207b635 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -17,6 +17,7 @@ package org.redisson.client.handler; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.redisson.client.RedisAskException; @@ -31,6 +32,8 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.QueueCommand; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; @@ -160,22 +163,48 @@ public class CommandDecoder extends ReplayingDecoder { Throwable error = null; while (in.writerIndex() > in.readerIndex()) { - CommandData cmd = null; + CommandData commandData = null; try { checkpoint(); state().setBatchIndex(i); - cmd = (CommandData) commandBatch.getCommands().get(i); - decode(in, cmd, null, ctx.channel()); + RedisCommand cmd = commandBatch.getCommands().get(i).getCommand(); + if (!commandBatch.isAtomic() + || RedisCommands.EXEC.getName().equals(cmd.getName()) + || RedisCommands.WAIT.getName().equals(cmd.getName())) { + commandData = (CommandData) commandBatch.getCommands().get(i); + } + + decode(in, commandData, null, ctx.channel()); + + if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { + List objects = (List) commandData.getPromise().getNow(); + Iterator iter = objects.iterator(); + boolean multiFound = false; + for (CommandData command : commandBatch.getCommands()) { + if (multiFound) { + if (!iter.hasNext()) { + break; + } + Object res = iter.next(); + + handleResult((CommandData) command, null, res, false, ctx.channel()); + } + + if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) { + multiFound = true; + } + } + } } catch (Exception e) { - cmd.tryFailure(e); + commandData.tryFailure(e); } i++; - if (!cmd.isSuccess()) { - error = cmd.cause(); + if (commandData != null && !commandData.isSuccess()) { + error = commandData.cause(); } } - if (commandBatch.isNoResult() || i == commandBatch.getCommands().size()) { + if (commandBatch.isSkipResult() || i == commandBatch.getCommands().size()) { RPromise promise = commandBatch.getPromise(); if (error != null) { if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java index 0f8d40866..1ffa15cbc 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -96,7 +96,7 @@ public class CommandEncoder extends MessageToByteEncoder> { log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8)); } } catch (Exception e) { - msg.getPromise().tryFailure(e); + msg.tryFailure(e); throw e; } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index 982dea493..6c5b4caf9 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -29,25 +29,31 @@ public class CommandsData implements QueueCommand { private final List> commands; private final RPromise promise; - private final boolean noResult; + private final boolean skipResult; + private final boolean atomic; public CommandsData(RPromise promise, List> commands) { - this(promise, commands, false); + this(promise, commands, false, false); } - public CommandsData(RPromise promise, List> commands, boolean noResult) { + public CommandsData(RPromise promise, List> commands, boolean skipResult, boolean atomic) { super(); this.promise = promise; this.commands = commands; - this.noResult = noResult; + this.skipResult = skipResult; + this.atomic = atomic; } public RPromise getPromise() { return promise; } - public boolean isNoResult() { - return noResult; + public boolean isAtomic() { + return atomic; + } + + public boolean isSkipResult() { + return skipResult; } public List> getCommands() { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index e8b37bbd6..c94b6d6d2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -140,18 +140,18 @@ public class CommandBatchService extends CommandAsyncService { } public BatchResult execute() { - RFuture> f = executeAsync(0, 0, false, 0, 0, 0); + RFuture> f = executeAsync(0, 0, false, 0, 0, 0, false); return get(f); } - public BatchResult execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) { - RFuture> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval); + public BatchResult execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) { + RFuture> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval, atomic); return get(f); } public RFuture executeAsyncVoid() { final RedissonPromise promise = new RedissonPromise(); - RFuture> res = executeAsync(0, 0, false, 0, 0, 0); + RFuture> res = executeAsync(0, 0, false, 0, 0, 0, false); res.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { @@ -166,10 +166,10 @@ public class CommandBatchService extends CommandAsyncService { } public RFuture> executeAsync() { - return executeAsync(0, 0, false, 0, 0, 0); + return executeAsync(0, 0, false, 0, 0, 0, false); } - public RFuture executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) { + public RFuture executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) { if (executed) { throw new IllegalStateException("Batch already executed!"); } @@ -179,6 +179,15 @@ public class CommandBatchService extends CommandAsyncService { } executed = true; + if (atomic) { + for (Entry entry : commands.values()) { + BatchCommandData multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet()); + entry.getCommands().addFirst(multiCommand); + BatchCommandData execCommand = new BatchCommandData(RedisCommands.EXEC, new Object[] {}, index.incrementAndGet()); + entry.getCommands().add(execCommand); + } + } + if (skipResult) { for (Entry entry : commands.values()) { BatchCommandData offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); @@ -226,12 +235,16 @@ public class CommandBatchService extends CommandAsyncService { List responses = new ArrayList(entries.size()); int syncedSlaves = 0; for (BatchCommandData commandEntry : entries) { - if (!isWaitCommand(commandEntry)) { + if (!isWaitCommand(commandEntry) + && !commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) + && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { Object entryResult = commandEntry.getPromise().getNow(); entryResult = tryHandleReference(entryResult); responses.add(entryResult); } else { - syncedSlaves = (Integer) commandEntry.getPromise().getNow(); + if (isWaitCommand(commandEntry)) { + syncedSlaves = (Integer) commandEntry.getPromise().getNow(); + } } } @@ -262,13 +275,13 @@ public class CommandBatchService extends CommandAsyncService { } for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval); + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval, atomic); } return resultPromise; } private void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, - final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval) { + final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval, final boolean atomic) { if (mainPromise.isCancelled()) { free(entry); return; @@ -364,7 +377,7 @@ public class CommandBatchService extends CommandAsyncService { int count = attempt + 1; mainPromise.removeListener(mainPromiseListener); - execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval); + execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval, atomic); } }; @@ -380,7 +393,7 @@ public class CommandBatchService extends CommandAsyncService { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts); + checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts, atomic); } }); @@ -398,19 +411,19 @@ public class CommandBatchService extends CommandAsyncService { RedisMovedException ex = (RedisMovedException)future.cause(); entry.clearErrors(); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED); - execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); + execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); entry.clearErrors(); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK); - execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); + execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic); return; } if (future.cause() instanceof RedisLoadingException) { entry.clearErrors(); - execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); + execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic); return; } if (future.cause() instanceof RedisTryAgainException) { @@ -418,7 +431,7 @@ public class CommandBatchService extends CommandAsyncService { connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); + execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic); } }, 1, TimeUnit.SECONDS); return; @@ -438,7 +451,7 @@ public class CommandBatchService extends CommandAsyncService { } private void checkWriteFuture(Entry entry, final RPromise attemptPromise, AsyncDetails details, - final RedisConnection connection, ChannelFuture future, boolean noResult, long responseTimeout, int attempts) { + final RedisConnection connection, ChannelFuture future, long responseTimeout, int attempts) { if (future.isCancelled() || attemptPromise.isDone()) { return; } @@ -471,7 +484,7 @@ public class CommandBatchService extends CommandAsyncService { private void checkConnectionFuture(final Entry entry, final NodeSource source, final RPromise mainPromise, final RPromise attemptPromise, final AsyncDetails details, - RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts) { + RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, boolean atomic) { if (connFuture.isCancelled()) { return; } @@ -495,20 +508,20 @@ public class CommandBatchService extends CommandAsyncService { list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); } for (BatchCommandData c : entry.getCommands()) { - if (c.getPromise().isSuccess() && !isWaitCommand(c)) { + if (c.getPromise().isSuccess() && !isWaitCommand(c) && !atomic) { // skip successful commands continue; } list.add(c); } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult)); + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult, atomic)); details.setWriteFuture(future); details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(entry, attemptPromise, details, connection, future, noResult, responseTimeout, attempts); + checkWriteFuture(entry, attemptPromise, details, connection, future, responseTimeout, attempts); } }); diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java index 89daff0a1..735ca3660 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java @@ -61,7 +61,7 @@ public class CommandReactiveBatchService extends CommandReactiveService { batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt); } - public RFuture> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) { + public RFuture> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) { for (Publisher publisher : publishers) { publisher.subscribe(new DefaultSubscriber() { @Override @@ -71,7 +71,7 @@ public class CommandReactiveBatchService extends CommandReactiveService { }); } - return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval); + return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval, atomic); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 4b0ad80ad..6d00d9254 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -63,6 +63,7 @@ public class RedissonBatchReactive implements RBatchReactive { private int syncSlaves; private long syncTimeout; private boolean skipResult; + private boolean atomic; public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { this.evictionScheduler = evictionScheduler; @@ -219,11 +220,16 @@ public class RedissonBatchReactive implements RBatchReactive { return new NettyFuturePublisher>(new Supplier>>() { @Override public RFuture> get() { - return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); + return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic); } }); } + public RBatchReactive atomic() { + this.atomic = true; + return this; + } + @Override public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) { this.syncSlaves = slaves; diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 9433491b6..aaa3a20be 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -121,6 +121,63 @@ public class RedissonBatchTest extends BaseTest { batch.execute(); } + @Test + public void testAtomic() { + RBatch batch = redisson.createBatch(); + batch.atomic(); + RFuture f1 = batch.getAtomicLong("A1").addAndGetAsync(1); + RFuture f2 = batch.getAtomicLong("A2").addAndGetAsync(2); + RFuture f3 = batch.getAtomicLong("A3").addAndGetAsync(3); + RFuture d1 = batch.getKeys().deleteAsync("A1", "A2"); + BatchResult f = batch.execute(); + + List list = (List) f.getResponses(); + assertThat(list).containsExactly(1L, 2L, 3L, 2L); + assertThat(f1.getNow()).isEqualTo(1); + assertThat(f2.getNow()).isEqualTo(2); + assertThat(f3.getNow()).isEqualTo(3); + assertThat(d1.getNow()).isEqualTo(2); + } + + @Test + public void testAtomicSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RBatch batch = redisson.createBatch(); + for (int i = 0; i < 10; i++) { + batch.getAtomicLong("{test}" + i).addAndGetAsync(i); + } + + batch.atomic(); + batch.syncSlaves(1, 1, TimeUnit.SECONDS); + BatchResult result = batch.execute(); + assertThat(result.getSyncedSlaves()).isEqualTo(1); + int i = 0; + for (Object res : result.getResponses()) { + assertThat((Long)res).isEqualTo(i++); + } + + process.shutdown(); + } + + @Test public void testDifferentCodecs() { RBatch b = redisson.createBatch(); From 7ead223c93198f4cb0dd93c969631d25fd444eae Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 17 Jan 2018 10:03:29 +0300 Subject: [PATCH 02/13] Fixed - RLocalCachedMap.putAll gets stuck. #1245 --- .../org/redisson/RedissonLocalCachedMap.java | 39 ++++++++++--------- .../redisson/RedissonLocalCachedMapTest.java | 22 ++++------- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index d92d87495..7496e028d 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -68,6 +68,7 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,6 @@ import io.netty.buffer.Unpooled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; /** * @@ -239,6 +239,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + log.error("Can't check existance", future.cause()); return; } @@ -953,7 +954,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } } - final RPromise> promise = newPromise(); + final RPromise> promise = new RedissonPromise>(); RFuture> future = super.getAllAsync(mapKeys); future.addListener(new FutureListener>() { @Override @@ -984,19 +985,16 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture putAllOperationAsync(final Map map) { + protected RFuture putAllOperationAsync(final Map map) { List params = new ArrayList(map.size()*3); params.add(invalidateEntryOnChange); params.add(map.size()*2); byte[][] hashes = new byte[map.size()][]; int i = 0; - int payloadSize = 0; for (java.util.Map.Entry t : map.entrySet()) { ByteBuf mapKey = encodeMapKey(t.getKey()); - payloadSize += mapKey.readableBytes(); ByteBuf mapValue = encodeMapValue(t.getValue()); - payloadSize += mapValue.readableBytes(); params.add(mapKey); params.add(mapValue); CacheKey cacheKey = toCacheKey(mapKey); @@ -1004,7 +1002,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R i++; } - ByteBuf msgEncoded; + ByteBuf msgEncoded = null; if (syncStrategy == SyncStrategy.UPDATE) { List entries = new ArrayList(); for (int j = 2; j < params.size(); j += 2) { @@ -1024,23 +1022,25 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] entryId = generateLogEntryId(hash); params.add(time); params.add(entryId); - payloadSize += entryId.length + 8; } } - params.add(msgEncoded); - payloadSize += msgEncoded.readableBytes(); - - log.debug("Payload size passed to putAll method: {}", payloadSize); + if (msgEncoded != null) { + params.add(msgEncoded); + } - final RPromise result = newPromise(); + final RPromise result = new RedissonPromise(); RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, - "redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" + "for i=3, tonumber(ARGV[2]) + 2, 5000 do " + + "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); " + + "end; " + "if ARGV[1] == '1' then " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "end;" + "if ARGV[1] == '2' then " - + "redis.call('zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));" + + "for i=tonumber(ARGV[2]) + 2 + 1, #ARGV - 1, 5000 do " + + "redis.call('hmset', KEYS[3], unpack(ARGV, i, math.min(i+4999, #ARGV - 1))); " + + "end; " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "end;", Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), @@ -1050,6 +1050,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + result.tryFailure(future.cause()); return; } @@ -1125,7 +1126,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R result.add((V) value.getValue()); } - final RPromise> promise = newPromise(); + final RPromise> promise = new RedissonPromise>(); RFuture> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS, "local entries = redis.call('hgetall', KEYS[1]); " + "local result = {};" @@ -1172,13 +1173,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R result.put((K)value.getKey(), (V)value.getValue()); } - final RPromise> promise = newPromise(); + final RPromise> promise = new RedissonPromise>(); RFuture> future = readAll(ALL_MAP, mapKeys, result); future.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { if (!future.isSuccess()) { + promise.tryFailure(future.cause()); return; } @@ -1215,13 +1217,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R result.add(new AbstractMap.SimpleEntry((K)value.getKey(), (V)value.getValue())); } - final RPromise>> promise = newPromise(); + final RPromise>> promise = new RedissonPromise>>(); RFuture>> future = readAll(ALL_ENTRIES, mapKeys, result); future.addListener(new FutureListener>>() { @Override public void operationComplete(Future>> future) throws Exception { if (!future.isSuccess()) { + promise.tryFailure(future.cause()); return; } diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index b04ded83a..f5a9d848d 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -126,24 +126,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { return redisson.getLocalCachedMap("test", options); } -// @Test - public void testBigData() throws InterruptedException { + @Test + public void testBigPutAll() throws InterruptedException { RLocalCachedMap m = redisson.getLocalCachedMap("testValuesWithNearCache2", - LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU)); + LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).syncStrategy(SyncStrategy.INVALIDATE)); - for (int i = 0; i < 100; i++) { - for (int k = 0; k < 1000; k++) { - Map map = new HashMap<>(); - map.put("" + k * i, "" + k * i); - m.putAll(map); - } - System.out.println(i); + Map map = new HashMap<>(); + for (int k = 0; k < 10000; k++) { + map.put("" + k, "" + k); } + m.putAll(map); - System.out.println("done"); - - Thread.sleep(1000000); - + assertThat(m.size()).isEqualTo(10000); } From 276d1e4bbd199a7ae64244d22556dd2d46c89ca8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 17 Jan 2018 10:07:40 +0300 Subject: [PATCH 03/13] Fixed RLocalCachedMap.putOperationAsync --- .../src/main/java/org/redisson/RedissonLocalCachedMap.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 7496e028d..c3abc3cdf 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -428,7 +428,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override protected RFuture putOperationAsync(K key, V value) { ByteBuf mapKey = encodeMapKey(key); - ByteBuf mapValue = encodeMapKey(value); + ByteBuf mapValue = encodeMapValue(value); CacheKey cacheKey = toCacheKey(mapKey); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); ByteBuf msg = createSyncMessage(mapKey, mapValue, cacheKey); @@ -446,7 +446,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end;" + "return v; ", Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), - mapKey, encodeMapValue(value), msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); + mapKey, mapValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } protected ByteBuf createSyncMessage(ByteBuf mapKey, ByteBuf mapValue, CacheKey cacheKey) { From 700dc2050c2ccf5103b10417ec2131586ae8f777 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jan 2018 09:06:34 +0300 Subject: [PATCH 04/13] refactoring --- redisson/src/main/java/org/redisson/RedissonGeo.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonGeo.java b/redisson/src/main/java/org/redisson/RedissonGeo.java index 444d776c4..1377941cb 100644 --- a/redisson/src/main/java/org/redisson/RedissonGeo.java +++ b/redisson/src/main/java/org/redisson/RedissonGeo.java @@ -30,6 +30,7 @@ import org.redisson.api.RGeo; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.CodecDecoder; @@ -91,7 +92,7 @@ public class RedissonGeo extends RedissonScoredSortedSet implements RGeo extends RedissonScoredSortedSet implements RGeo distAsync(V firstMember, V secondMember, GeoUnit geoUnit) { - return commandExecutor.readAsync(getName(), codec, RedisCommands.GEODIST, getName(), encode(firstMember), encode(secondMember), geoUnit); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.GEODIST, getName(), encode(firstMember), encode(secondMember), geoUnit); } @Override @@ -117,7 +118,7 @@ public class RedissonGeo extends RedissonScoredSortedSet implements RGeo> command = new RedisCommand>("GEOHASH", new MapGetAllDecoder((List)Arrays.asList(members), 0)); - return commandExecutor.readAsync(getName(), codec, command, params.toArray()); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray()); } @Override @@ -135,7 +136,7 @@ public class RedissonGeo extends RedissonScoredSortedSet implements RGeo> decoder = new ListMultiDecoder(new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoPositionMapDecoder((List)Arrays.asList(members))); RedisCommand> command = new RedisCommand>("GEOPOS", decoder); - return commandExecutor.readAsync(getName(), codec, command, params.toArray()); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray()); } @Override From b4ef4dbd550d28d6bc29e6ecde60dc7bd85cb736 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jan 2018 11:03:42 +0300 Subject: [PATCH 05/13] Useless RedissonConcurrentMapTest removed --- .../redisson/RedissonConcurrentMapTest.java | 177 ------------------ 1 file changed, 177 deletions(-) delete mode 100644 redisson/src/test/java/org/redisson/RedissonConcurrentMapTest.java diff --git a/redisson/src/test/java/org/redisson/RedissonConcurrentMapTest.java b/redisson/src/test/java/org/redisson/RedissonConcurrentMapTest.java deleted file mode 100644 index a94b51cb6..000000000 --- a/redisson/src/test/java/org/redisson/RedissonConcurrentMapTest.java +++ /dev/null @@ -1,177 +0,0 @@ -package org.redisson; - -import java.security.SecureRandom; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import org.junit.Assert; -import org.junit.Test; - -public class RedissonConcurrentMapTest extends BaseConcurrentTest { - - @Test - public void testSingleReplaceOldValue_SingleInstance() throws InterruptedException { - final String name = "testSingleReplaceOldValue_SingleInstance"; - - ConcurrentMap map = BaseTest.createInstance().getMap(name); - map.put("1", "122"); - - testSingleInstanceConcurrency(100, r -> { - ConcurrentMap map1 = r.getMap(name); - map1.replace("1", "122", "32"); - map1.replace("1", "0", "31"); - }); - - ConcurrentMap testMap = BaseTest.createInstance().getMap(name); - Assert.assertEquals("32", testMap.get("1")); - - assertMapSize(1, name); - } - - @Test - public void testSingleRemoveValue_SingleInstance() throws InterruptedException { - final String name = "testSingleRemoveValue_SingleInstance"; - - ConcurrentMap map = BaseTest.createInstance().getMap(name); - map.putIfAbsent("1", "0"); - testSingleInstanceConcurrency(100, r -> { - ConcurrentMap map1 = r.getMap(name); - map1.remove("1", "0"); - }); - - assertMapSize(0, name); - } - - @Test - public void testSingleReplace_SingleInstance() throws InterruptedException { - final String name = "testSingleReplace_SingleInstance"; - - ConcurrentMap map = BaseTest.createInstance().getMap(name); - map.put("1", "0"); - - testSingleInstanceConcurrency(100, r -> { - ConcurrentMap map1 = r.getMap(name); - map1.replace("1", "3"); - }); - - ConcurrentMap testMap = BaseTest.createInstance().getMap(name); - Assert.assertEquals("3", testMap.get("1")); - - assertMapSize(1, name); - } - - @Test - public void test_Multi_Replace_MultiInstance() throws InterruptedException { - final String name = "test_Multi_Replace_MultiInstance"; - - ConcurrentMap map = BaseTest.createInstance().getMap(name); - for (int i = 0; i < 5; i++) { - map.put(i, 1); - } - - final SecureRandom secureRandom = new SecureRandom(); - testSingleInstanceConcurrency(100, r -> { - ConcurrentMap map1 = r.getMap(name); - Assert.assertNotNull(map1.replace(secureRandom.nextInt(5), 2)); - }); - - ConcurrentMap testMap = BaseTest.createInstance().getMap(name); - for (Integer value : testMap.values()) { - Assert.assertEquals(2, (int)value); - } - assertMapSize(5, name); - - } - - @Test - public void test_Multi_RemoveValue_MultiInstance() throws InterruptedException { - final String name = "test_Multi_RemoveValue_MultiInstance"; - - ConcurrentMap map = BaseTest.createInstance().getMap(name); - for (int i = 0; i < 10; i++) { - map.put(i, 1); - } - - final SecureRandom secureRandom = new SecureRandom(); - testMultiInstanceConcurrency(100, r -> { - ConcurrentMap map1 = r.getMap(name); - map1.remove(secureRandom.nextInt(10), 1); - }); - - assertMapSize(0, name); - } - - @Test - public void testSinglePutIfAbsent_SingleInstance() throws InterruptedException { - final String name = "testSinglePutIfAbsent_SingleInstance"; - - ConcurrentMap map = BaseTest.createInstance().getMap(name); - map.putIfAbsent("1", "0"); - testSingleInstanceConcurrency(100, r -> { - ConcurrentMap map1 = r.getMap(name); - map1.putIfAbsent("1", "1"); - }); - - ConcurrentMap testMap = BaseTest.createInstance().getMap(name); - Assert.assertEquals("0", testMap.get("1")); - - assertMapSize(1, name); - } - - @Test - public void testMultiPutIfAbsent_SingleInstance() throws InterruptedException { - final String name = "testMultiPutIfAbsent_SingleInstance"; - testSingleInstanceConcurrency(100, r -> { - ConcurrentMap map = r.getMap(name); - map.putIfAbsent("" + Math.random(), "1"); - }); - - assertMapSize(100, name); - } - - @Test - public void testMultiPutIfAbsent_MultiInstance() throws InterruptedException { - final String name = "testMultiPutIfAbsent_MultiInstance"; - testMultiInstanceConcurrency(100, r -> { - ConcurrentMap map = r.getMap(name); - map.putIfAbsent("" + Math.random(), "1"); - }); - - assertMapSize(100, name); - } - - private void assertMapSize(int size, String name) { - Map map = BaseTest.createInstance().getMap(name); - Assert.assertEquals(size, map.size()); - clear(map); - } - - @Test - public void testMultiPut_SingleInstance() throws InterruptedException { - final String name = "testMultiPut_SingleInstance"; - testSingleInstanceConcurrency(100, r -> { - Map map = r.getMap(name); - map.put("" + Math.random(), "1"); - }); - - assertMapSize(100, name); - } - - - @Test - public void testMultiPut_MultiInstance() throws InterruptedException { - final String name = "testMultiPut_MultiInstance"; - testMultiInstanceConcurrency(100, r -> { - ConcurrentMap map = r.getMap(name); - map.putIfAbsent("" + Math.random(), "1"); - }); - - assertMapSize(100, name); - } - - private void clear(Map map) { - map.clear(); - Assert.assertEquals(0, map.size()); - } - -} From e30739cb19931d9677d7489f71057a00bae1ece1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jan 2018 12:41:57 +0300 Subject: [PATCH 06/13] RedissonSemaphoreTest.testConcurrency_MultiInstance_10_permits removed --- .../org/redisson/RedissonSemaphoreTest.java | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java index 9994de384..d0551d510 100644 --- a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -255,43 +255,4 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { assertThat(lockedCounter.get()).isEqualTo(iterations); } - @Test - public void testConcurrency_MultiInstance_10_permits() throws InterruptedException { - Assume.assumeFalse(RedissonRuntimeEnvironment.isTravis); - int iterations = 100; - final AtomicInteger lockedCounter = new AtomicInteger(); - - RSemaphore s = redisson.getSemaphore("test"); - s.trySetPermits(10); - - final AtomicInteger checkPermits = new AtomicInteger(s.availablePermits()); - final CyclicBarrier barrier = new CyclicBarrier(s.availablePermits()); - testMultiInstanceConcurrencySequentiallyLaunched(iterations, r -> { - RSemaphore s1 = r.getSemaphore("test"); - try { - s1.acquire(); - barrier.await(); - if (checkPermits.decrementAndGet() > 0) { - assertThat(s1.availablePermits()).isEqualTo(0); - assertThat(s1.tryAcquire()).isFalse(); - } else { - Thread.sleep(50); - } - }catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - }catch (BrokenBarrierException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - int value = lockedCounter.get(); - lockedCounter.set(value + 1); - s1.release(); - }); - - System.out.println(lockedCounter.get()); - - assertThat(lockedCounter.get()).isLessThan(iterations); - } - } From fa48e706ff4f48bda8494b8daf9ee1d4da1f7732 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jan 2018 13:24:13 +0300 Subject: [PATCH 07/13] RBucket.getAndDelete method added. #1247 --- .../src/main/java/org/redisson/RedissonBucket.java | 14 ++++++++++++++ .../src/main/java/org/redisson/api/RBucket.java | 2 ++ .../main/java/org/redisson/api/RBucketAsync.java | 2 ++ .../test/java/org/redisson/RedissonBucketTest.java | 9 +++++++++ 4 files changed, 27 insertions(+) diff --git a/redisson/src/main/java/org/redisson/RedissonBucket.java b/redisson/src/main/java/org/redisson/RedissonBucket.java index 9661d1636..98ae352d8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBucket.java +++ b/redisson/src/main/java/org/redisson/RedissonBucket.java @@ -102,6 +102,20 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { return commandExecutor.readAsync(getName(), codec, RedisCommands.GET, getName()); } + @Override + public V getAndDelete() { + return get(getAndDeleteAsync()); + } + + @Override + public RFuture getAndDeleteAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, + "local currValue = redis.call('get', KEYS[1]); " + + "redis.call('del', KEYS[1]); " + + "return currValue; ", + Collections.singletonList(getName())); + } + @Override public long size() { return get(sizeAsync()); diff --git a/redisson/src/main/java/org/redisson/api/RBucket.java b/redisson/src/main/java/org/redisson/api/RBucket.java index f05f42f12..d1b0ec26d 100644 --- a/redisson/src/main/java/org/redisson/api/RBucket.java +++ b/redisson/src/main/java/org/redisson/api/RBucket.java @@ -35,6 +35,8 @@ public interface RBucket extends RExpirable, RBucketAsync { V get(); + V getAndDelete(); + boolean trySet(V value); boolean trySet(V value, long timeToLive, TimeUnit timeUnit); diff --git a/redisson/src/main/java/org/redisson/api/RBucketAsync.java b/redisson/src/main/java/org/redisson/api/RBucketAsync.java index d3658fa2c..9f41ef0d1 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBucketAsync.java @@ -34,6 +34,8 @@ public interface RBucketAsync extends RExpirableAsync { RFuture sizeAsync(); RFuture getAsync(); + + RFuture getAndDeleteAsync(); RFuture trySetAsync(V value); diff --git a/redisson/src/test/java/org/redisson/RedissonBucketTest.java b/redisson/src/test/java/org/redisson/RedissonBucketTest.java index c72c91e82..55c087330 100755 --- a/redisson/src/test/java/org/redisson/RedissonBucketTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBucketTest.java @@ -12,6 +12,15 @@ import org.redisson.api.RBucket; public class RedissonBucketTest extends BaseTest { + @Test + public void testGetAndDelete() { + RBucket al = redisson.getBucket("test"); + al.set(10); + assertThat(al.getAndDelete()).isEqualTo(10); + assertThat(al.isExists()).isFalse(); + assertThat(al.getAndDelete()).isNull(); + } + @Test public void testSize() { RBucket bucket = redisson.getBucket("testCompareAndSet"); From 6bcd474c6558e91ef8c8d47c09fd3a5a0de9c9d9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jan 2018 13:25:57 +0300 Subject: [PATCH 08/13] RAtomicLong.getAndDelete and RAtomicDouble.getAndDelete methods added. #1248 --- .../org/redisson/RedissonAtomicDouble.java | 21 ++++++++++++++++--- .../java/org/redisson/RedissonAtomicLong.java | 18 ++++++++++++++-- .../java/org/redisson/api/RAtomicDouble.java | 7 +++++++ .../org/redisson/api/RAtomicDoubleAsync.java | 12 +++++++++++ .../java/org/redisson/api/RAtomicLong.java | 7 +++++++ .../org/redisson/api/RAtomicLongAsync.java | 12 +++++++++++ .../client/protocol/RedisCommands.java | 8 +++++-- .../DoubleNullSafeReplayConvertor.java | 19 +++++++++++++++++ .../redisson/RedissonAtomicDoubleTest.java | 17 +++++++++++++++ .../org/redisson/RedissonAtomicLongTest.java | 19 +++++++++++++++++ 10 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleNullSafeReplayConvertor.java diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java index 61ceba620..e23d67c23 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java @@ -60,7 +60,8 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo @Override public RFuture compareAndSetAsync(double expect, double update) { return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if tonumber(redis.call('get', KEYS[1])) == tonumber(ARGV[1]) then " + "local value = redis.call('get', KEYS[1]);" + + "if (value == false and tonumber(ARGV[1]) == 0) or (tonumber(value) == tonumber(ARGV[1])) then " + "redis.call('set', KEYS[1], ARGV[2]); " + "return 1 " + "else " @@ -80,12 +81,26 @@ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDo @Override public double get() { - return addAndGet(0); + return get(getAsync()); } + @Override + public double getAndDelete() { + return get(getAndDeleteAsync()); + } + + @Override + public RFuture getAndDeleteAsync() { + return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_DOUBLE, + "local currValue = redis.call('get', KEYS[1]); " + + "redis.call('del', KEYS[1]); " + + "return currValue; ", + Collections.singletonList(getName())); + } + @Override public RFuture getAsync() { - return addAndGetAsync(0); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.GET_DOUBLE, getName()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicLong.java b/redisson/src/main/java/org/redisson/RedissonAtomicLong.java index 69b8bb647..22d65d1f2 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicLong.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicLong.java @@ -66,6 +66,20 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong + "end", Collections.singletonList(getName()), expect, update); } + + @Override + public long getAndDelete() { + return get(getAndDeleteAsync()); + } + + @Override + public RFuture getAndDeleteAsync() { + return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_LONG_SAFE, + "local currValue = redis.call('get', KEYS[1]); " + + "redis.call('del', KEYS[1]); " + + "return currValue; ", + Collections.singletonList(getName())); + } @Override public long decrementAndGet() { @@ -79,12 +93,12 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong @Override public long get() { - return addAndGet(0); + return get(getAsync()); } @Override public RFuture getAsync() { - return addAndGetAsync(0); + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.GET_LONG, getName()); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RAtomicDouble.java b/redisson/src/main/java/org/redisson/api/RAtomicDouble.java index e52b395dc..7ec2abcb5 100644 --- a/redisson/src/main/java/org/redisson/api/RAtomicDouble.java +++ b/redisson/src/main/java/org/redisson/api/RAtomicDouble.java @@ -62,6 +62,13 @@ public interface RAtomicDouble extends RExpirable, RAtomicDoubleAsync { * @return the current value */ double get(); + + /** + * Gets and deletes object + * + * @return the current value + */ + double getAndDelete(); /** * Atomically adds the given value to the current value. diff --git a/redisson/src/main/java/org/redisson/api/RAtomicDoubleAsync.java b/redisson/src/main/java/org/redisson/api/RAtomicDoubleAsync.java index d83dbf883..9889655ab 100644 --- a/redisson/src/main/java/org/redisson/api/RAtomicDoubleAsync.java +++ b/redisson/src/main/java/org/redisson/api/RAtomicDoubleAsync.java @@ -15,6 +15,11 @@ */ package org.redisson.api; +/** + * + * @author Nikita Koksharov + * + */ public interface RAtomicDoubleAsync extends RExpirableAsync { RFuture compareAndSetAsync(double expect, double update); @@ -25,6 +30,13 @@ public interface RAtomicDoubleAsync extends RExpirableAsync { RFuture getAsync(); + /** + * Gets and deletes object + * + * @return the current value + */ + RFuture getAndDeleteAsync(); + RFuture getAndAddAsync(double delta); RFuture getAndSetAsync(double newValue); diff --git a/redisson/src/main/java/org/redisson/api/RAtomicLong.java b/redisson/src/main/java/org/redisson/api/RAtomicLong.java index 714f56fcb..c2d8da378 100644 --- a/redisson/src/main/java/org/redisson/api/RAtomicLong.java +++ b/redisson/src/main/java/org/redisson/api/RAtomicLong.java @@ -63,6 +63,13 @@ public interface RAtomicLong extends RExpirable, RAtomicLongAsync { */ long get(); + /** + * Gets and deletes object + * + * @return the current value + */ + long getAndDelete(); + /** * Atomically adds the given value to the current value. * diff --git a/redisson/src/main/java/org/redisson/api/RAtomicLongAsync.java b/redisson/src/main/java/org/redisson/api/RAtomicLongAsync.java index 02bffedbd..c4cf387d1 100644 --- a/redisson/src/main/java/org/redisson/api/RAtomicLongAsync.java +++ b/redisson/src/main/java/org/redisson/api/RAtomicLongAsync.java @@ -15,6 +15,11 @@ */ package org.redisson.api; +/** + * + * @author Nikita Koksharov + * + */ public interface RAtomicLongAsync extends RExpirableAsync { RFuture compareAndSetAsync(long expect, long update); @@ -24,6 +29,13 @@ public interface RAtomicLongAsync extends RExpirableAsync { RFuture decrementAndGetAsync(); RFuture getAsync(); + + /** + * Gets and deletes object + * + * @return the current value + */ + RFuture getAndDeleteAsync(); RFuture getAndAddAsync(long delta); diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 48299979a..c3eac37eb 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -32,6 +32,7 @@ import org.redisson.client.protocol.convertor.BooleanNullReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNullSafeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.DoubleNullSafeReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor; @@ -133,7 +134,7 @@ public interface RedisCommands { RedisCommand>> ZRANGE_ENTRY = new RedisCommand>>("ZRANGE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGEBYSCORE_ENTRY = new RedisCommand>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); RedisCommand> ZSCAN = new RedisCommand>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetScanDecoder(), new ScoredSortedSetScanReplayDecoder())); - RedisStrictCommand ZINCRBY = new RedisStrictCommand("ZINCRBY", new DoubleReplayConvertor()); + RedisStrictCommand ZINCRBY = new RedisStrictCommand("ZINCRBY", new DoubleNullSafeReplayConvertor()); RedisCommand> SCAN = new RedisCommand>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); RedisStrictCommand RANDOM_KEY = new RedisStrictCommand("RANDOMKEY", new StringDataDecoder()); @@ -221,7 +222,9 @@ public interface RedisCommands { RedisStrictCommand EVAL_STRING = new RedisStrictCommand("EVAL", new StringReplayDecoder()); RedisStrictCommand EVAL_STRING_DATA = new RedisStrictCommand("EVAL", new StringDataDecoder()); RedisStrictCommand EVAL_INTEGER = new RedisStrictCommand("EVAL", new IntegerReplayConvertor()); + RedisStrictCommand EVAL_DOUBLE = new RedisStrictCommand("EVAL", new DoubleNullSafeReplayConvertor()); RedisStrictCommand EVAL_LONG = new RedisStrictCommand("EVAL"); + RedisStrictCommand EVAL_LONG_SAFE = new RedisStrictCommand("EVAL", new LongReplayConvertor()); RedisStrictCommand EVAL_VOID = new RedisStrictCommand("EVAL", new VoidReplayConvertor()); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); RedisCommand> EVAL_SET = new RedisCommand>("EVAL", new ObjectSetReplayDecoder()); @@ -235,7 +238,7 @@ public interface RedisCommands { RedisStrictCommand INCR = new RedisStrictCommand("INCR"); RedisStrictCommand INCRBY = new RedisStrictCommand("INCRBY"); - RedisStrictCommand INCRBYFLOAT = new RedisStrictCommand("INCRBYFLOAT", new DoubleReplayConvertor()); + RedisStrictCommand INCRBYFLOAT = new RedisStrictCommand("INCRBYFLOAT", new DoubleNullSafeReplayConvertor()); RedisStrictCommand DECR = new RedisStrictCommand("DECR"); RedisStrictCommand AUTH = new RedisStrictCommand("AUTH", new VoidReplayConvertor()); @@ -281,6 +284,7 @@ public interface RedisCommands { RedisCommand GET = new RedisCommand("GET"); RedisStrictCommand GET_LONG = new RedisStrictCommand("GET", new LongReplayConvertor()); RedisStrictCommand GET_INTEGER = new RedisStrictCommand("GET", new IntegerReplayConvertor()); + RedisStrictCommand GET_DOUBLE = new RedisStrictCommand("GET", new DoubleNullSafeReplayConvertor()); RedisCommand GETSET = new RedisCommand("GETSET"); RedisCommand GETRANGE = new RedisCommand("GETRANGE"); RedisCommand APPEND = new RedisCommand("APPEND"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleNullSafeReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleNullSafeReplayConvertor.java new file mode 100644 index 000000000..c1c0b1c5c --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/DoubleNullSafeReplayConvertor.java @@ -0,0 +1,19 @@ +package org.redisson.client.protocol.convertor; + +/** + * + * @author Nikita Koksharov + * + */ +public class DoubleNullSafeReplayConvertor extends DoubleReplayConvertor { + + @Override + public Double convert(Object obj) { + Double r = super.convert(obj); + if (r == null) { + return 0.0; + } + return r; + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonAtomicDoubleTest.java b/redisson/src/test/java/org/redisson/RedissonAtomicDoubleTest.java index 3a4850ca0..0d69e2186 100644 --- a/redisson/src/test/java/org/redisson/RedissonAtomicDoubleTest.java +++ b/redisson/src/test/java/org/redisson/RedissonAtomicDoubleTest.java @@ -9,6 +9,23 @@ import org.redisson.api.RAtomicDouble; public class RedissonAtomicDoubleTest extends BaseTest { + @Test + public void testGetZero() { + RAtomicDouble ad2 = redisson.getAtomicDouble("test"); + assertThat(ad2.get()).isZero(); + } + + @Test + public void testGetAndDelete() { + RAtomicDouble al = redisson.getAtomicDouble("test"); + al.set(10.34); + assertThat(al.getAndDelete()).isEqualTo(10.34); + assertThat(al.isExists()).isFalse(); + + RAtomicDouble ad2 = redisson.getAtomicDouble("test2"); + assertThat(ad2.getAndDelete()).isZero(); + } + @Test public void testCompareAndSet() { RAtomicDouble al = redisson.getAtomicDouble("test"); diff --git a/redisson/src/test/java/org/redisson/RedissonAtomicLongTest.java b/redisson/src/test/java/org/redisson/RedissonAtomicLongTest.java index a35ff024f..170368e51 100644 --- a/redisson/src/test/java/org/redisson/RedissonAtomicLongTest.java +++ b/redisson/src/test/java/org/redisson/RedissonAtomicLongTest.java @@ -1,11 +1,30 @@ package org.redisson; +import static org.assertj.core.api.Assertions.assertThat; + import org.junit.Assert; import org.junit.Test; import org.redisson.api.RAtomicLong; public class RedissonAtomicLongTest extends BaseTest { + @Test + public void testGetZero() { + RAtomicLong ad2 = redisson.getAtomicLong("test"); + assertThat(ad2.get()).isZero(); + } + + @Test + public void testGetAndDelete() { + RAtomicLong al = redisson.getAtomicLong("test"); + al.set(10); + assertThat(al.getAndDelete()).isEqualTo(10); + assertThat(al.isExists()).isFalse(); + + RAtomicLong ad2 = redisson.getAtomicLong("test2"); + assertThat(ad2.getAndDelete()).isZero(); + } + @Test public void testCompareAndSetZero() { RAtomicLong al = redisson.getAtomicLong("test"); From e234d6ca7742bf5a508c6189b24b2e4fe47fd98d Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 18 Jan 2018 13:26:20 +0300 Subject: [PATCH 09/13] refactoring --- .../java/org/redisson/RedissonBaseAdder.java | 38 ++++++++----------- .../org/redisson/RedissonDoubleAdder.java | 4 +- .../java/org/redisson/RedissonLongAdder.java | 4 +- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index 37c65e45e..91eba4573 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -15,17 +15,13 @@ */ package org.redisson; -import java.util.Arrays; - import org.redisson.api.RFuture; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.LongCodec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.LongAdder; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; @@ -109,20 +105,17 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RFuture sumAsync() { final RPromise result = new RedissonPromise(); - RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER, - "redis.call('del', KEYS[1]); " - + "return redis.call('publish', KEYS[2], ARGV[1]); ", - Arrays.asList(getName(), topic.getChannelNames().get(0)), SUM_MSG); - future.addListener(new FutureListener() { + RFuture future = topic.publishAsync(SUM_MSG); + future.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } - - semaphore.acquireAsync(future.getNow()).addListener(new FutureListener() { + + semaphore.acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { @@ -130,7 +123,7 @@ public abstract class RedissonBaseAdder extends RedissonExpira return; } - RFuture valueFuture = getAsync(); + RFuture valueFuture = getAndDeleteAsync(); valueFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -153,21 +146,22 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RFuture resetAsync() { final RPromise result = new RedissonPromise(); - RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER, - "redis.call('del', KEYS[1]); " - + "return redis.call('publish', KEYS[2], ARGV[1]); ", - Arrays.asList(getName(), topic.getChannelNames().get(0)), CLEAR_MSG); - - future.addListener(new FutureListener() { + RFuture future = topic.publishAsync(CLEAR_MSG); + future.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } - semaphore.acquireAsync(future.getNow()).addListener(new FutureListener() { + int value = 0; + if (future.getNow() != null) { + value = future.getNow().intValue(); + } + + semaphore.acquireAsync(value).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { @@ -190,6 +184,6 @@ public abstract class RedissonBaseAdder extends RedissonExpira protected abstract RFuture addAndGetAsync(); - protected abstract RFuture getAsync(); + protected abstract RFuture getAndDeleteAsync(); } diff --git a/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java b/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java index b5a582db6..2803d7ef3 100644 --- a/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java @@ -50,8 +50,8 @@ public class RedissonDoubleAdder extends RedissonBaseAdder implements RD } @Override - protected RFuture getAsync() { - return atomicDouble.getAsync(); + protected RFuture getAndDeleteAsync() { + return atomicDouble.getAndDeleteAsync(); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonLongAdder.java b/redisson/src/main/java/org/redisson/RedissonLongAdder.java index c0faeda87..6cb4dfd22 100644 --- a/redisson/src/main/java/org/redisson/RedissonLongAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonLongAdder.java @@ -49,8 +49,8 @@ public class RedissonLongAdder extends RedissonBaseAdder implements RLongA } @Override - protected RFuture getAsync() { - return atomicLong.getAsync(); + protected RFuture getAndDeleteAsync() { + return atomicLong.getAndDeleteAsync(); } @Override From efd17b9440a6dcebef4dedad1ce2d18e62925466 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Jan 2018 12:22:08 +0300 Subject: [PATCH 10/13] Fixed - RKeys.countExists and touch return wrong result in cluster mode --- .../client/protocol/RedisCommands.java | 4 +- .../redisson/command/CommandAsyncService.java | 98 ++++++++++--------- .../redisson/command/CommandBatchService.java | 2 +- .../command/CommandReactiveBatchService.java | 4 +- 4 files changed, 58 insertions(+), 50 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index c3eac37eb..b5de691a2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -294,9 +294,9 @@ public interface RedisCommands { RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor()); RedisCommand PSETEX = new RedisCommand("PSETEX", new VoidReplayConvertor()); - RedisStrictCommand TOUCH_LONG = new RedisStrictCommand("TOUCH"); + RedisStrictCommand TOUCH_LONG = new RedisStrictCommand("TOUCH", new LongReplayConvertor()); RedisStrictCommand TOUCH = new RedisStrictCommand("TOUCH", new BooleanReplayConvertor()); - RedisStrictCommand EXISTS_LONG = new RedisStrictCommand("EXISTS"); + RedisStrictCommand EXISTS_LONG = new RedisStrictCommand("EXISTS", new LongReplayConvertor()); RedisStrictCommand EXISTS = new RedisStrictCommand("EXISTS", new BooleanReplayConvertor()); RedisStrictCommand NOT_EXISTS = new RedisStrictCommand("EXISTS", new BooleanNumberReplayConvertor(1L)); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 750df9db9..9dc91c96b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -42,6 +42,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisLoadingException; import org.redisson.client.RedisMovedException; +import org.redisson.client.RedisRedirectException; import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTryAgainException; import org.redisson.client.WriteRedisConnectionException; @@ -62,6 +63,7 @@ import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; +import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,36 +183,36 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); int slot = connectionManager.calcSlot(name); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(client), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { - final RPromise> mainPromise = connectionManager.newPromise(); + final RPromise> mainPromise = new RedissonPromise>(); final Collection nodes = connectionManager.getEntrySet(); final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { + if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } @@ -234,16 +236,16 @@ public class CommandAsyncService implements CommandAsyncExecutor { }; for (MasterSlaveEntry entry : nodes) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true); } return mainPromise; } @Override public RFuture readRandomAsync(RedisCommand command, Object... params) { - final RPromise mainPromise = connectionManager.newPromise(); + final RPromise mainPromise = new RedissonPromise(); final List nodes = new ArrayList(connectionManager.getEntrySet()); Collections.shuffle(nodes); @@ -253,7 +255,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { private void retryReadRandomAsync(final RedisCommand command, final RPromise mainPromise, final List nodes, final Object... params) { - final RPromise attemptPromise = connectionManager.newPromise(); + final RPromise attemptPromise = new RedissonPromise(); attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -274,7 +276,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); MasterSlaveEntry entry = nodes.remove(0); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false); } @Override @@ -293,19 +295,24 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object... params) { - final RPromise mainPromise = connectionManager.newPromise(); + final RPromise mainPromise = new RedissonPromise(); final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { + if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } + + T result = future.getNow(); + if (future.cause() instanceof RedisRedirectException) { + result = command.getConvertor().convert(result); + } if (callback != null) { - callback.onSlotResult(future.getNow()); + callback.onSlotResult(result); } if (counter.decrementAndGet() == 0) { if (callback != null) { @@ -318,9 +325,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { }; for (MasterSlaveEntry entry : nodes) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true); } return mainPromise; } @@ -339,22 +346,22 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0); + async(true, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(entry), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(false, new NodeSource(entry), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -396,14 +403,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { } public RFuture evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object... params) { - final RPromise mainPromise = connectionManager.newPromise(); + final RPromise mainPromise = new RedissonPromise(); final Collection entries = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(entries.size()); FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { + if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } @@ -422,21 +429,21 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (MasterSlaveEntry entry : entries) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true); } return mainPromise; } private RFuture evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); List args = new ArrayList(2 + keys.size() + params.length); args.add(script); args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0); + async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false); return mainPromise; } @@ -447,14 +454,15 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0); + async(false, source, codec, command, params, mainPromise, 0, false); return mainPromise; } protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, - final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt) { + final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt, + final boolean ignoreRedirect) { if (mainPromise.isCancelled()) { free(params); return; @@ -490,7 +498,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { connectionFuture = connectionManager.connectionWriteOp(source, command); } - final RPromise attemptPromise = connectionManager.newPromise(); + final RPromise attemptPromise = new RedissonPromise(); details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt); @@ -566,7 +574,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { count, details.getCommand(), Arrays.toString(details.getParams())); } details.removeMainPromiseListener(); - async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); AsyncDetails.release(details); } @@ -597,10 +605,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { final RedisConnection connection = connFuture.getNow(); if (details.getSource().getRedirect() == Redirect.ASK) { List> list = new ArrayList>(2); - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); - RPromise main = connectionManager.newPromise(); + RPromise main = new RedissonPromise(); ChannelFuture future = connection.send(new CommandsData(main, list)); details.setWriteFuture(future); } else { @@ -626,7 +634,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - checkAttemptFuture(source, details, future); + checkAttemptFuture(source, details, future, ignoreRedirect); } }); } @@ -780,7 +788,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void checkAttemptFuture(final NodeSource source, final AsyncDetails details, - Future future) { + Future future, boolean ignoreRedirect) { details.getTimeout().cancel(); if (future.isCancelled()) { return; @@ -788,25 +796,25 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.removeMainPromiseListener(); - if (future.cause() instanceof RedisMovedException) { + if (future.cause() instanceof RedisMovedException && !ignoreRedirect) { RedisMovedException ex = (RedisMovedException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } - if (future.cause() instanceof RedisAskException) { + if (future.cause() instanceof RedisAskException && !ignoreRedirect) { RedisAskException ex = (RedisAskException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } if (future.cause() instanceof RedisLoadingException) { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } @@ -816,7 +824,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public void run(Timeout timeout) throws Exception { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); } }, 1, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index c94b6d6d2..22589d84c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -111,7 +111,7 @@ public class CommandBatchService extends CommandAsyncService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt) { + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { if (executed) { throw new IllegalStateException("Batch already has been executed!"); } diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java index 735ca3660..37ef6b7b9 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java @@ -57,8 +57,8 @@ public class CommandReactiveBatchService extends CommandReactiveService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); } public RFuture> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) { From e37729deee165108b8e5261802692f03794e2b15 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Jan 2018 13:45:13 +0300 Subject: [PATCH 11/13] Comments added --- redisson/src/main/java/org/redisson/api/RMap.java | 4 ++-- .../src/main/java/org/redisson/api/RScoredSortedSet.java | 6 ++++++ .../main/java/org/redisson/api/RScoredSortedSetAsync.java | 6 ++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 28aa91954..b2341256f 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -38,7 +38,7 @@ import org.redisson.api.mapreduce.RMapReduce; public interface RMap extends ConcurrentMap, RExpirable, RMapAsync { /** - * Loads all map entries to this Redis map. + * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}. * * @param replaceExistingValues - true if existed values should be replaced, false otherwise. * @param parallelism - parallelism level, used to increase speed of process execution @@ -46,7 +46,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsynckeys parameter. + * Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined keys parameter. * * @param keys - map keys * @param replaceExistingValues - true if existed values should be replaced, false otherwise. diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 339c046b2..fb8e1b777 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -79,6 +79,12 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ Integer revRank(V o); + /** + * Returns score of element or null if it doesn't exist. + * + * @param o - element + * @return score + */ Double getScore(V o); /** diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 481ae8159..21d8d8c5f 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -52,6 +52,12 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn RFuture revRankAsync(V o); + /** + * Returns score of element or null if it doesn't exist. + * + * @param o - element + * @return score + */ RFuture getScoreAsync(V o); /** From 1c4df641853ca3e8b5840105eb88197c9cf89888 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 23 Jan 2018 11:42:34 +0300 Subject: [PATCH 12/13] Fixed - Wrong parsing of RScript.ReturnType.MULTI result. #1251 --- .../client/handler/CommandDecoder.java | 46 ++++++++++++++----- .../org/redisson/client/handler/State.java | 18 ++++---- .../redisson/client/handler/StateLevel.java | 16 +++++++ .../java/org/redisson/RedissonScriptTest.java | 35 ++++++++++++++ 4 files changed, 94 insertions(+), 21 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 71207b635..d529ea939 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -137,12 +137,11 @@ public class CommandDecoder extends ReplayingDecoder { decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts()); - Channel channel = ctx.channel(); MultiDecoder decoder = messageDecoder(cmd, firstLevel.getParts()); if (decoder != null) { Object result = decoder.decode(firstLevel.getParts(), state()); if (data != null) { - handleResult(cmd, null, result, true, channel); + handleResult(cmd, null, result, true, ctx.channel()); } } } @@ -152,7 +151,18 @@ public class CommandDecoder extends ReplayingDecoder { state().resetLevel(); decode(in, cmd, null, ctx.channel()); } else { - decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); + if (firstLevel.getLastList() != null) { + decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList()); + firstLevel.setLastList(null); + firstLevel.setLastListSize(0); + + if (in.isReadable()) { + decode(in, cmd, firstLevel.getParts(), ctx.channel()); + } + decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts()); + } else { + decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); + } } } } @@ -287,22 +297,34 @@ public class CommandDecoder extends ReplayingDecoder { } handleResult(data, parts, result, false, channel); } else if (code == '*') { - int level = state().incLevel(); - long size = readLong(in); List respParts; - if (state().getLevels().size()-1 >= level) { - StateLevel stateLevel = state().getLevels().get(level); - respParts = stateLevel.getParts(); - size = stateLevel.getSize(); - } else { + + StateLevel lastLevel = state().getLastLevel(); + if (lastLevel != null && lastLevel.getSize() != lastLevel.getParts().size()) { respParts = new ArrayList(); - if (state().isMakeCheckpoint()) { - state().addLevel(new StateLevel(size, respParts)); + lastLevel.setLastListSize(size); + lastLevel.setLastList(respParts); + } else { + int level = state().incLevel(); + if (state().getLevels().size()-1 >= level) { + StateLevel stateLevel = state().getLevels().get(level); + respParts = stateLevel.getParts(); + size = stateLevel.getSize(); + } else { + respParts = new ArrayList(); + if (state().isMakeCheckpoint()) { + state().addLevel(new StateLevel(size, respParts)); + } } } decodeList(in, data, parts, channel, size, respParts); + + if (lastLevel != null && lastLevel.getLastList() != null) { + lastLevel.setLastList(null); + lastLevel.setLastListSize(0); + } } else { String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); throw new IllegalStateException("Can't decode replay: " + dataStr); diff --git a/redisson/src/main/java/org/redisson/client/handler/State.java b/redisson/src/main/java/org/redisson/client/handler/State.java index 6fab0d7f3..8d7b40175 100644 --- a/redisson/src/main/java/org/redisson/client/handler/State.java +++ b/redisson/src/main/java/org/redisson/client/handler/State.java @@ -28,7 +28,6 @@ public class State { private int level = -1; private List levels; - private DecoderState decoderStateCopy; private final boolean makeCheckpoint; public State(boolean makeCheckpoint) { @@ -41,6 +40,7 @@ public class State { public void resetLevel() { level = -1; + levels.clear(); } public int decLevel() { return --level; @@ -49,6 +49,13 @@ public class State { return ++level; } + public StateLevel getLastLevel() { + if (levels == null || levels.isEmpty()) { + return null; + } + return levels.get(level); + } + public void addLevel(StateLevel stateLevel) { if (levels == null) { levels = new ArrayList(2); @@ -76,17 +83,10 @@ public class State { this.decoderState = decoderState; } - public DecoderState getDecoderStateCopy() { - return decoderStateCopy; - } - public void setDecoderStateCopy(DecoderState decoderStateCopy) { - this.decoderStateCopy = decoderStateCopy; - } - @Override public String toString() { return "State [batchIndex=" + batchIndex + ", decoderState=" + decoderState + ", level=" + level + ", levels=" - + levels + ", decoderStateCopy=" + decoderStateCopy + "]"; + + levels + "]"; } diff --git a/redisson/src/main/java/org/redisson/client/handler/StateLevel.java b/redisson/src/main/java/org/redisson/client/handler/StateLevel.java index 710728dd8..bfbd4980d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/StateLevel.java +++ b/redisson/src/main/java/org/redisson/client/handler/StateLevel.java @@ -21,6 +21,8 @@ public class StateLevel { private long size; private List parts; + private long lastListSize; + private List lastList; public StateLevel(long size, List parts) { super(); @@ -28,6 +30,20 @@ public class StateLevel { this.parts = parts; } + public long getLastListSize() { + return lastListSize; + } + public void setLastListSize(long lastListSize) { + this.lastListSize = lastListSize; + } + + public List getLastList() { + return lastList; + } + public void setLastList(List lastList) { + this.lastList = lastList; + } + public long getSize() { return size; } diff --git a/redisson/src/test/java/org/redisson/RedissonScriptTest.java b/redisson/src/test/java/org/redisson/RedissonScriptTest.java index 5c0f9ecf9..7ee416ceb 100644 --- a/redisson/src/test/java/org/redisson/RedissonScriptTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScriptTest.java @@ -5,16 +5,51 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RFuture; +import org.redisson.api.RLexSortedSet; import org.redisson.api.RScript; import org.redisson.api.RScript.Mode; import org.redisson.client.RedisException; +import org.redisson.client.codec.StringCodec; public class RedissonScriptTest extends BaseTest { + @Test + public void testMulti() throws InterruptedException, ExecutionException { + RLexSortedSet idx2 = redisson.getLexSortedSet("ABCD17436"); + + Long l = new Long("1506524856000"); + for (int i = 0; i < 100; i++) { + String s = "DENY" + "\t" + "TESTREDISSON" + "\t" + + Long.valueOf(l) + "\t" + "helloworld_hongqin"; + idx2.add(s); + l = l + 1; + } + + StringCodec codec = new StringCodec(); + String max = "'[DENY" + "\t" + "TESTREDISSON" + "\t" + "1506524856099'"; + String min = "'[DENY" + "\t" + "TESTREDISSON" + "\t" + "1506524856000'"; + String luaScript1= "local d = {}; d[1] = redis.call('zrevrangebylex','ABCD17436'," +max+","+min+",'LIMIT',0,5); "; + luaScript1= luaScript1 + " d[2] = redis.call('zrevrangebylex','ABCD17436'," +max+","+min+",'LIMIT',0,15); "; + luaScript1= luaScript1 + " d[3] = redis.call('zrevrangebylex','ABCD17436'," +max+","+min+",'LIMIT',0,25); "; + luaScript1 = luaScript1 + " return d;"; + + Future r1 = redisson.getScript().evalAsync(RScript.Mode.READ_ONLY, codec, + luaScript1, + RScript.ReturnType.MULTI, Collections.emptyList()); + List> obj1 = (List>) r1.get(); + + assertThat(obj1).hasSize(3); + assertThat(obj1.get(0)).hasSize(5); + assertThat(obj1.get(1)).hasSize(15); + assertThat(obj1.get(2)).hasSize(25); + } + @Test public void testEval() { RScript script = redisson.getScript(); From 3db1c46c420181515d054b987c10968dd23640bf Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 23 Jan 2018 12:32:15 +0300 Subject: [PATCH 13/13] Fixed - RedissonReadLock by name with colon couldn't be unlocked properly #1219 --- .../java/org/redisson/RedissonReadLock.java | 4 +- .../redisson/RedissonReadWriteLockTest.java | 42 ++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index 096a335ac..a16d1ad25 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -87,6 +87,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { @Override protected RFuture unlockInnerAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); + String keyPrefix = timeoutPrefix.split(":" + getLockName(threadId))[0]; + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + @@ -129,7 +131,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", - Arrays.asList(getName(), getChannelName(), timeoutPrefix, timeoutPrefix.split(":")[0]), + Arrays.asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.unlockMessage, getLockName(threadId)); } diff --git a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java index ca411e5b6..dab811af0 100644 --- a/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReadWriteLockTest.java @@ -1,10 +1,20 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.awaitility.Awaitility.await; import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -18,10 +28,38 @@ import org.redisson.api.RReadWriteLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; -import static org.awaitility.Awaitility.*; - public class RedissonReadWriteLockTest extends BaseConcurrentTest { + @Test + public void testName() throws InterruptedException, ExecutionException, TimeoutException { + ExecutorService service = Executors.newFixedThreadPool(10); + RReadWriteLock rwlock = redisson.getReadWriteLock("{test}:abc:key"); + RLock rlock = rwlock.readLock(); + + List> callables = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + callables.add(() -> { + for (int j = 0; j < 10; j++) { + rlock.lock(); + try { + } finally { + rlock.unlock(); + } + } + return null; + }); + } + + List> futures = service.invokeAll(callables); + for (Future future : futures) { + assertThatCode(future::get).doesNotThrowAnyException(); + } + + service.shutdown(); + assertThat(service.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + } + + @Test public void testWriteLockExpiration() throws InterruptedException { RReadWriteLock rw1 = redisson.getReadWriteLock("test2s3");