diff --git a/src/main/java/org/redisson/RedissonBatch.java b/src/main/java/org/redisson/RedissonBatch.java index 978fb5b65..895ea8c14 100644 --- a/src/main/java/org/redisson/RedissonBatch.java +++ b/src/main/java/org/redisson/RedissonBatch.java @@ -18,7 +18,7 @@ package org.redisson; import java.util.List; import org.redisson.client.codec.Codec; -import org.redisson.command.CommandBatchAsyncService; +import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; import org.redisson.core.RAtomicLongAsync; import org.redisson.core.RBatch; @@ -41,10 +41,10 @@ import io.netty.util.concurrent.Future; public class RedissonBatch implements RBatch { - private final CommandBatchAsyncService executorService; + private final CommandBatchService executorService; public RedissonBatch(ConnectionManager connectionManager) { - this.executorService = new CommandBatchAsyncService(connectionManager); + this.executorService = new CommandBatchService(connectionManager); } @Override diff --git a/src/main/java/org/redisson/RedissonBatchReactive.java b/src/main/java/org/redisson/RedissonBatchReactive.java new file mode 100644 index 000000000..e93efa934 --- /dev/null +++ b/src/main/java/org/redisson/RedissonBatchReactive.java @@ -0,0 +1,178 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandBatchService; +import org.redisson.connection.ConnectionManager; +import org.redisson.core.RAtomicLongReactive; +import org.redisson.core.RBatchReactive; +import org.redisson.core.RBitSetReactive; +import org.redisson.core.RBlockingQueueReactive; +import org.redisson.core.RBucketReactive; +import org.redisson.core.RDequeReactive; +import org.redisson.core.RHyperLogLogReactive; +import org.redisson.core.RLexSortedSetReactive; +import org.redisson.core.RListReactive; +import org.redisson.core.RMapReactive; +import org.redisson.core.RQueueReactive; +import org.redisson.core.RScoredSortedSetReactive; +import org.redisson.core.RScriptReactive; +import org.redisson.core.RSetReactive; +import org.redisson.core.RTopicReactive; + +public class RedissonBatchReactive implements RBatchReactive { + + private final CommandBatchService executorService; + + public RedissonBatchReactive(ConnectionManager connectionManager) { + this.executorService = new CommandBatchService(connectionManager); + } + + @Override + public RBucketReactive getBucket(String name) { + return new RedissonBucketReactive(executorService, name); + } + + @Override + public RBucketReactive getBucket(String name, Codec codec) { + return new RedissonBucketReactive(codec, executorService, name); + } + + @Override + public RHyperLogLogReactive getHyperLogLog(String name) { + return new RedissonHyperLogLogReactive(executorService, name); + } + + @Override + public RHyperLogLogReactive getHyperLogLog(String name, Codec codec) { + return new RedissonHyperLogLogReactive(codec, executorService, name); + } + + @Override + public RListReactive getList(String name) { + return new RedissonListReactive(executorService, name); + } + + @Override + public RListReactive getList(String name, Codec codec) { + return new RedissonListReactive(codec, executorService, name); + } + + @Override + public RMapReactive getMap(String name) { + return new RedissonMapReactive(executorService, name); + } + + @Override + public RMapReactive getMap(String name, Codec codec) { + return new RedissonMapReactive(codec, executorService, name); + } + + @Override + public RSetReactive getSet(String name) { + return new RedissonSetReactive(executorService, name); + } + + @Override + public RSetReactive getSet(String name, Codec codec) { + return new RedissonSetReactive(codec, executorService, name); + } + + @Override + public RTopicReactive getTopic(String name) { + return new RedissonTopicReactive(executorService, name); + } + + @Override + public RTopicReactive getTopic(String name, Codec codec) { + return new RedissonTopicReactive(codec, executorService, name); + } + + @Override + public RQueueReactive getQueue(String name) { + return new RedissonQueueReactive(executorService, name); + } + + @Override + public RQueueReactive getQueue(String name, Codec codec) { + return new RedissonQueueReactive(codec, executorService, name); + } + + @Override + public RBlockingQueueReactive getBlockingQueue(String name) { + return new RedissonBlockingQueueReactive(executorService, name); + } + + @Override + public RBlockingQueueReactive getBlockingQueue(String name, Codec codec) { + return new RedissonBlockingQueueReactive(codec, executorService, name); + } + + @Override + public RDequeReactive getDequeReactive(String name) { + return new RedissonDequeReactive(executorService, name); + } + + @Override + public RDequeReactive getDequeReactive(String name, Codec codec) { + return new RedissonDequeReactive(codec, executorService, name); + } + + @Override + public RAtomicLongReactive getAtomicLongReactive(String name) { + return new RedissonAtomicLongReactive(executorService, name); + } + + @Override + public RScoredSortedSetReactive getScoredSortedSet(String name) { + return new RedissonScoredSortedSetReactive(executorService, name); + } + + @Override + public RScoredSortedSetReactive getScoredSortedSet(String name, Codec codec) { + return new RedissonScoredSortedSetReactive(codec, executorService, name); + } + + @Override + public RLexSortedSetReactive getLexSortedSet(String name) { + return new RedissonLexSortedSetReactive(executorService, name); + } + + @Override + public RBitSetReactive getBitSet(String name) { + return new RedissonBitSetReactive(executorService, name); + } + + @Override + public RScriptReactive getScript() { + return new RedissonScriptReactive(executorService); + } + +// @Override +// public RKeysReactive getKeys() { +// return new RedissonKeys(executorService); +// } + + @Override + public Publisher> executeReactive() { + return new NettyFuturePublisher>(executorService.executeAsync()); + } + +} diff --git a/src/main/java/org/redisson/RedissonBitSet.java b/src/main/java/org/redisson/RedissonBitSet.java index 4039e4fd5..aef6ed9b8 100644 --- a/src/main/java/org/redisson/RedissonBitSet.java +++ b/src/main/java/org/redisson/RedissonBitSet.java @@ -24,7 +24,7 @@ import java.util.List; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.command.CommandBatchAsyncService; +import org.redisson.command.CommandBatchService; import org.redisson.core.RBitSet; import io.netty.util.concurrent.Future; @@ -180,7 +180,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet { @Override public Future clearAsync(int fromIndex, int toIndex) { - CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager()); + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (int i = fromIndex; i < toIndex; i++) { executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0); } @@ -199,7 +199,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet { @Override public Future setAsync(int fromIndex, int toIndex) { - CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager()); + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (int i = fromIndex; i < toIndex; i++) { executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1); } diff --git a/src/main/java/org/redisson/RedissonBitSetReactive.java b/src/main/java/org/redisson/RedissonBitSetReactive.java index d2cae8cd0..8aee70d94 100644 --- a/src/main/java/org/redisson/RedissonBitSetReactive.java +++ b/src/main/java/org/redisson/RedissonBitSetReactive.java @@ -25,7 +25,7 @@ import org.reactivestreams.Publisher; import org.redisson.client.codec.BitSetCodec; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandBatchAsyncService; +import org.redisson.command.CommandBatchService; import org.redisson.command.CommandReactiveExecutor; import org.redisson.core.RBitSetReactive; @@ -98,7 +98,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements @Override public Publisher clear(int fromIndex, int toIndex) { - CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager()); + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (int i = fromIndex; i < toIndex; i++) { executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0); } @@ -117,7 +117,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements @Override public Publisher set(int fromIndex, int toIndex) { - CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager()); + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (int i = fromIndex; i < toIndex; i++) { executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1); } diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 62eceee03..24d4fc9ae 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -30,6 +30,7 @@ import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; import org.redisson.core.RAtomicLongReactive; +import org.redisson.core.RBatchReactive; import org.redisson.core.RBitSetReactive; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; @@ -232,6 +233,11 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonScriptReactive(commandExecutor); } + @Override + public RBatchReactive createBatch() { + return new RedissonBatchReactive(connectionManager); + } + public Config getConfig() { return config; } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 9674e5b06..3af9b9118 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -19,6 +19,7 @@ import java.util.List; import org.redisson.client.codec.Codec; import org.redisson.core.RAtomicLongReactive; +import org.redisson.core.RBatchReactive; import org.redisson.core.RBitSetReactive; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; @@ -185,16 +186,16 @@ public interface RedissonReactiveClient { */ RScriptReactive getScript(); -// /** -// * Return batch object which executes group of -// * command in pipeline. -// * -// * See http://redis.io/topics/pipelining -// * -// * @return -// */ -// RBatch createBatch(); -// + /** + * Return batch object which executes group of + * command in pipeline. + * + * See http://redis.io/topics/pipelining + * + * @return + */ + RBatchReactive createBatch(); + // /** // * Returns keys operations. // * Each of Redis/Redisson object associated with own key diff --git a/src/main/java/org/redisson/command/CommandBatchAsyncService.java b/src/main/java/org/redisson/command/CommandBatchService.java similarity index 99% rename from src/main/java/org/redisson/command/CommandBatchAsyncService.java rename to src/main/java/org/redisson/command/CommandBatchService.java index 376fa018f..d52ae4816 100644 --- a/src/main/java/org/redisson/command/CommandBatchAsyncService.java +++ b/src/main/java/org/redisson/command/CommandBatchService.java @@ -51,7 +51,7 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; -public class CommandBatchAsyncService extends CommandAsyncService { +public class CommandBatchService extends CommandReactiveService { public static class CommandEntry implements Comparable { @@ -101,7 +101,7 @@ public class CommandBatchAsyncService extends CommandAsyncService { private boolean executed; - public CommandBatchAsyncService(ConnectionManager connectionManager) { + public CommandBatchService(ConnectionManager connectionManager) { super(connectionManager); } diff --git a/src/main/java/org/redisson/core/RBatchReactive.java b/src/main/java/org/redisson/core/RBatchReactive.java new file mode 100644 index 000000000..feafe0f4e --- /dev/null +++ b/src/main/java/org/redisson/core/RBatchReactive.java @@ -0,0 +1,185 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.RedisException; +import org.redisson.client.codec.Codec; + +import io.netty.util.concurrent.Future; + +/** + * Interface for using pipeline feature. + * + * All methods invocations via Reactive objects + * which have gotten from this interface are batched + * to separate queue and could be executed later + * with execute() or executeReactive() methods. + * + * + * @author Nikita Koksharov + * + */ +public interface RBatchReactive { + + /** + * Returns object holder by name + * + * @param name of object + * @return + */ + RBucketReactive getBucket(String name); + + RBucketReactive getBucket(String name, Codec codec); + + /** + * Returns HyperLogLog object + * + * @param name of object + * @return + */ + RHyperLogLogReactive getHyperLogLog(String name); + + RHyperLogLogReactive getHyperLogLog(String name, Codec codec); + + /** + * Returns list instance by name. + * + * @param name of list + * @return + */ + RListReactive getList(String name); + + RListReactive getList(String name, Codec codec); + + /** + * Returns map instance by name. + * + * @param name of map + * @return + */ + RMapReactive getMap(String name); + + RMapReactive getMap(String name, Codec codec); + + /** + * Returns set instance by name. + * + * @param name of set + * @return + */ + RSetReactive getSet(String name); + + RSetReactive getSet(String name, Codec codec); + + /** + * Returns topic instance by name. + * + * @param name of topic + * @return + */ + RTopicReactive getTopic(String name); + + RTopicReactive getTopic(String name, Codec codec); + + /** + * Returns queue instance by name. + * + * @param name of queue + * @return + */ + RQueueReactive getQueue(String name); + + RQueueReactive getQueue(String name, Codec codec); + + /** + * Returns blocking queue instance by name. + * + * @param name of queue + * @return + */ + RBlockingQueueReactive getBlockingQueue(String name); + + RBlockingQueueReactive getBlockingQueue(String name, Codec codec); + + /** + * Returns deque instance by name. + * + * @param name of deque + * @return + */ + RDequeReactive getDequeReactive(String name); + + RDequeReactive getDequeReactive(String name, Codec codec); + + /** + * Returns "atomic long" instance by name. + * + * @param name of the "atomic long" + * @return + */ + RAtomicLongReactive getAtomicLongReactive(String name); + + /** + * Returns Redis Sorted Set instance by name + * + * @param name + * @return + */ + RScoredSortedSetReactive getScoredSortedSet(String name); + + RScoredSortedSetReactive getScoredSortedSet(String name, Codec codec); + + /** + * Returns String based Redis Sorted Set instance by name + * All elements are inserted with the same score during addition, + * in order to force lexicographical ordering + * + * @param name + * @return + */ + RLexSortedSetReactive getLexSortedSet(String name); + + RBitSetReactive getBitSet(String name); + + /** + * Returns script operations object + * + * @return + */ + RScriptReactive getScript(); + +// /** +// * Returns keys operations. +// * Each of Redis/Redisson object associated with own key +// * +// * @return +// */ +// RKeysReactive getKeys(); + + /** + * Executes all operations accumulated during Reactive methods invocations Reactivehronously. + * + * In cluster configurations operations grouped by slot ids + * so may be executed on different servers. Thus command execution order could be changed + * + * @return List with result object for each command + */ + Publisher> executeReactive(); + +}