RBatchReactive added. #210

pull/337/head
Nikita 9 years ago
parent bf4897793e
commit 4d9c84b727

@ -18,7 +18,7 @@ package org.redisson;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongAsync;
import org.redisson.core.RBatch;
@ -41,10 +41,10 @@ import io.netty.util.concurrent.Future;
public class RedissonBatch implements RBatch {
private final CommandBatchAsyncService executorService;
private final CommandBatchService executorService;
public RedissonBatch(ConnectionManager connectionManager) {
this.executorService = new CommandBatchAsyncService(connectionManager);
this.executorService = new CommandBatchService(connectionManager);
}
@Override

@ -0,0 +1,178 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBatchReactive;
import org.redisson.core.RBitSetReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMapReactive;
import org.redisson.core.RQueueReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RScriptReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
public class RedissonBatchReactive implements RBatchReactive {
private final CommandBatchService executorService;
public RedissonBatchReactive(ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
}
@Override
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(executorService, name);
}
@Override
public <V> RBucketReactive<V> getBucket(String name, Codec codec) {
return new RedissonBucketReactive<V>(codec, executorService, name);
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name) {
return new RedissonHyperLogLogReactive<V>(executorService, name);
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLogReactive<V>(codec, executorService, name);
}
@Override
public <V> RListReactive<V> getList(String name) {
return new RedissonListReactive<V>(executorService, name);
}
@Override
public <V> RListReactive<V> getList(String name, Codec codec) {
return new RedissonListReactive<V>(codec, executorService, name);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(executorService, name);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
return new RedissonMapReactive<K, V>(codec, executorService, name);
}
@Override
public <V> RSetReactive<V> getSet(String name) {
return new RedissonSetReactive<V>(executorService, name);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
return new RedissonSetReactive<V>(codec, executorService, name);
}
@Override
public <M> RTopicReactive<M> getTopic(String name) {
return new RedissonTopicReactive<M>(executorService, name);
}
@Override
public <M> RTopicReactive<M> getTopic(String name, Codec codec) {
return new RedissonTopicReactive<M>(codec, executorService, name);
}
@Override
public <V> RQueueReactive<V> getQueue(String name) {
return new RedissonQueueReactive<V>(executorService, name);
}
@Override
public <V> RQueueReactive<V> getQueue(String name, Codec codec) {
return new RedissonQueueReactive<V>(codec, executorService, name);
}
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name) {
return new RedissonBlockingQueueReactive<V>(executorService, name);
}
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueueReactive<V>(codec, executorService, name);
}
@Override
public <V> RDequeReactive<V> getDequeReactive(String name) {
return new RedissonDequeReactive<V>(executorService, name);
}
@Override
public <V> RDequeReactive<V> getDequeReactive(String name, Codec codec) {
return new RedissonDequeReactive<V>(codec, executorService, name);
}
@Override
public RAtomicLongReactive getAtomicLongReactive(String name) {
return new RedissonAtomicLongReactive(executorService, name);
}
@Override
public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSetReactive<V>(executorService, name);
}
@Override
public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSetReactive<V>(codec, executorService, name);
}
@Override
public RLexSortedSetReactive getLexSortedSet(String name) {
return new RedissonLexSortedSetReactive(executorService, name);
}
@Override
public RBitSetReactive getBitSet(String name) {
return new RedissonBitSetReactive(executorService, name);
}
@Override
public RScriptReactive getScript() {
return new RedissonScriptReactive(executorService);
}
// @Override
// public RKeysReactive getKeys() {
// return new RedissonKeys(executorService);
// }
@Override
public Publisher<List<?>> executeReactive() {
return new NettyFuturePublisher<List<?>>(executorService.executeAsync());
}
}

@ -24,7 +24,7 @@ import java.util.List;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.core.RBitSet;
import io.netty.util.concurrent.Future;
@ -180,7 +180,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public Future<Void> clearAsync(int fromIndex, int toIndex) {
CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0);
}
@ -199,7 +199,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public Future<Void> setAsync(int fromIndex, int toIndex) {
CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1);
}

@ -25,7 +25,7 @@ import org.reactivestreams.Publisher;
import org.redisson.client.codec.BitSetCodec;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandBatchAsyncService;
import org.redisson.command.CommandBatchService;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.core.RBitSetReactive;
@ -98,7 +98,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements
@Override
public Publisher<Void> clear(int fromIndex, int toIndex) {
CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0);
}
@ -117,7 +117,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements
@Override
public Publisher<Void> set(int fromIndex, int toIndex) {
CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager());
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1);
}

@ -30,6 +30,7 @@ import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBatchReactive;
import org.redisson.core.RBitSetReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
@ -232,6 +233,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonScriptReactive(commandExecutor);
}
@Override
public RBatchReactive createBatch() {
return new RedissonBatchReactive(connectionManager);
}
public Config getConfig() {
return config;
}

@ -19,6 +19,7 @@ import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBatchReactive;
import org.redisson.core.RBitSetReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
@ -185,16 +186,16 @@ public interface RedissonReactiveClient {
*/
RScriptReactive getScript();
// /**
// * Return batch object which executes group of
// * command in pipeline.
// *
// * See <a href="http://redis.io/topics/pipelining">http://redis.io/topics/pipelining</a>
// *
// * @return
// */
// RBatch createBatch();
//
/**
* Return batch object which executes group of
* command in pipeline.
*
* See <a href="http://redis.io/topics/pipelining">http://redis.io/topics/pipelining</a>
*
* @return
*/
RBatchReactive createBatch();
// /**
// * Returns keys operations.
// * Each of Redis/Redisson object associated with own key

@ -51,7 +51,7 @@ import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
public class CommandBatchAsyncService extends CommandAsyncService {
public class CommandBatchService extends CommandReactiveService {
public static class CommandEntry implements Comparable<CommandEntry> {
@ -101,7 +101,7 @@ public class CommandBatchAsyncService extends CommandAsyncService {
private boolean executed;
public CommandBatchAsyncService(ConnectionManager connectionManager) {
public CommandBatchService(ConnectionManager connectionManager) {
super(connectionManager);
}

@ -0,0 +1,185 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import io.netty.util.concurrent.Future;
/**
* Interface for using pipeline feature.
*
* All methods invocations via Reactive objects
* which have gotten from this interface are batched
* to separate queue and could be executed later
* with <code>execute()</code> or <code>executeReactive()</code> methods.
*
*
* @author Nikita Koksharov
*
*/
public interface RBatchReactive {
/**
* Returns object holder by name
*
* @param name of object
* @return
*/
<V> RBucketReactive<V> getBucket(String name);
<V> RBucketReactive<V> getBucket(String name, Codec codec);
/**
* Returns HyperLogLog object
*
* @param name of object
* @return
*/
<V> RHyperLogLogReactive<V> getHyperLogLog(String name);
<V> RHyperLogLogReactive<V> getHyperLogLog(String name, Codec codec);
/**
* Returns list instance by name.
*
* @param name of list
* @return
*/
<V> RListReactive<V> getList(String name);
<V> RListReactive<V> getList(String name, Codec codec);
/**
* Returns map instance by name.
*
* @param name of map
* @return
*/
<K, V> RMapReactive<K, V> getMap(String name);
<K, V> RMapReactive<K, V> getMap(String name, Codec codec);
/**
* Returns set instance by name.
*
* @param name of set
* @return
*/
<V> RSetReactive<V> getSet(String name);
<V> RSetReactive<V> getSet(String name, Codec codec);
/**
* Returns topic instance by name.
*
* @param name of topic
* @return
*/
<M> RTopicReactive<M> getTopic(String name);
<M> RTopicReactive<M> getTopic(String name, Codec codec);
/**
* Returns queue instance by name.
*
* @param name of queue
* @return
*/
<V> RQueueReactive<V> getQueue(String name);
<V> RQueueReactive<V> getQueue(String name, Codec codec);
/**
* Returns blocking queue instance by name.
*
* @param name of queue
* @return
*/
<V> RBlockingQueueReactive<V> getBlockingQueue(String name);
<V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec);
/**
* Returns deque instance by name.
*
* @param name of deque
* @return
*/
<V> RDequeReactive<V> getDequeReactive(String name);
<V> RDequeReactive<V> getDequeReactive(String name, Codec codec);
/**
* Returns "atomic long" instance by name.
*
* @param name of the "atomic long"
* @return
*/
RAtomicLongReactive getAtomicLongReactive(String name);
/**
* Returns Redis Sorted Set instance by name
*
* @param name
* @return
*/
<V> RScoredSortedSetReactive<V> getScoredSortedSet(String name);
<V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec);
/**
* Returns String based Redis Sorted Set instance by name
* All elements are inserted with the same score during addition,
* in order to force lexicographical ordering
*
* @param name
* @return
*/
RLexSortedSetReactive getLexSortedSet(String name);
RBitSetReactive getBitSet(String name);
/**
* Returns script operations object
*
* @return
*/
RScriptReactive getScript();
// /**
// * Returns keys operations.
// * Each of Redis/Redisson object associated with own key
// *
// * @return
// */
// RKeysReactive getKeys();
/**
* Executes all operations accumulated during Reactive methods invocations Reactivehronously.
*
* In cluster configurations operations grouped by slot ids
* so may be executed on different servers. Thus command execution order could be changed
*
* @return List with result object for each command
*/
Publisher<List<?>> executeReactive();
}
Loading…
Cancel
Save