From e56c6e007517fb1b36fd2b37a2039fb0e3354efb Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 28 Nov 2015 18:49:30 +0300 Subject: [PATCH] Initial RedissonSetReactive support. #210 --- .../java/org/redisson/RedissonReactive.java | 10 + .../org/redisson/RedissonReactiveClient.java | 39 +--- src/main/java/org/redisson/RedissonSet.java | 4 +- .../org/redisson/RedissonSetReactive.java | 214 ++++++++++++++++++ .../client/protocol/RedisCommands.java | 6 +- .../java/org/redisson/core/RSetReactive.java | 37 +++ 6 files changed, 278 insertions(+), 32 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonSetReactive.java create mode 100644 src/main/java/org/redisson/core/RSetReactive.java diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 5811bfe00..099d4542c 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -33,6 +33,7 @@ import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMap; import org.redisson.core.RMapReactive; +import org.redisson.core.RSetReactive; import io.netty.util.concurrent.Future; @@ -131,6 +132,15 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonMapReactive(codec, commandExecutor, name); } + public RSetReactive getSet(String name) { + return new RedissonSetReactive(commandExecutor, name); + } + + public RSetReactive getSet(String name, Codec codec) { + return new RedissonSetReactive(codec, commandExecutor, name); + + } + @Override public void shutdown() { connectionManager.shutdown(); diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 59bd047a9..69bd2cb66 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -23,6 +23,7 @@ import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMap; import org.redisson.core.RMapReactive; +import org.redisson.core.RSetReactive; public interface RedissonReactiveClient { @@ -71,34 +72,16 @@ public interface RedissonReactiveClient { RMapReactive getMap(String name, Codec codec); -// /** -// * Returns lock instance by name. -// * -// * @param name of lock -// * @return -// */ -// RLock getLock(String name); -// -// /** -// * Returns set instance by name. -// * -// * @param name of set -// * @return -// */ -// RSet getSet(String name); -// -// RSet getSet(String name, Codec codec); -// -// /** -// * Returns sorted set instance by name. -// * -// * @param name of sorted set -// * @return -// */ -// RSortedSet getSortedSet(String name); -// -// RSortedSet getSortedSet(String name, Codec codec); -// + /** + * Returns set instance by name. + * + * @param name of set + * @return + */ + RSetReactive getSet(String name); + + RSetReactive getSet(String name, Codec codec); + // /** // * Returns Redis Sorted Set instance by name // * diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index d9e4b0713..99db12786 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -58,7 +58,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { @Override public Future sizeAsync() { - return commandExecutor.readAsync(getName(), codec, RedisCommands.SCARD, getName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.SCARD_INT, getName()); } @Override @@ -225,7 +225,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { List args = new ArrayList(c.size() + 1); args.add(getName()); args.addAll(c); - return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD, args.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_BOOL, args.toArray()); } @Override diff --git a/src/main/java/org/redisson/RedissonSetReactive.java b/src/main/java/org/redisson/RedissonSetReactive.java new file mode 100644 index 000000000..75deaa663 --- /dev/null +++ b/src/main/java/org/redisson/RedissonSetReactive.java @@ -0,0 +1,214 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.core.RSet; +import org.redisson.core.RSetReactive; + +/** + * Distributed and concurrent implementation of {@link java.util.Set} + * + * @author Nikita Koksharov + * + * @param value + */ +public class RedissonSetReactive extends RedissonExpirableReactive implements RSetReactive { + + private static final RedisCommand EVAL_OBJECTS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4); + + protected RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + + @Override + public Publisher size() { + return commandExecutor.readObservable(getName(), codec, RedisCommands.SCARD, getName()); + } + + @Override + public Publisher contains(Object o) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.SISMEMBER, getName(), o); + } + +// private ListScanResult scanIterator(InetSocketAddress client, long startPos) { +// Publisher> f = commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); +// return get(f); +// } +// +// @Override +// public Iterator iterator() { +// return new Iterator() { +// +// private List firstValues; +// private Iterator iter; +// private InetSocketAddress client; +// private long nextIterPos; +// +// private boolean currentElementRemoved; +// private boolean removeExecuted; +// private V value; +// +// @Override +// public boolean hasNext() { +// if (iter == null || !iter.hasNext()) { +// if (nextIterPos == -1) { +// return false; +// } +// long prevIterPos = nextIterPos; +// ListScanResult res = scanIterator(client, nextIterPos); +// client = res.getRedisClient(); +// if (nextIterPos == 0 && firstValues == null) { +// firstValues = res.getValues(); +// } else if (res.getValues().equals(firstValues)) { +// return false; +// } +// iter = res.getValues().iterator(); +// nextIterPos = res.getPos(); +// if (prevIterPos == nextIterPos && !removeExecuted) { +// nextIterPos = -1; +// } +// } +// return iter.hasNext(); +// } +// +// @Override +// public V next() { +// if (!hasNext()) { +// throw new NoSuchElementException("No such element at index"); +// } +// +// value = iter.next(); +// currentElementRemoved = false; +// return value; +// } +// +// @Override +// public void remove() { +// if (currentElementRemoved) { +// throw new IllegalStateException("Element been already deleted"); +// } +// if (iter == null) { +// throw new IllegalStateException(); +// } +// +// iter.remove(); +// RedissonSetReactive.this.remove(value); +// currentElementRemoved = true; +// removeExecuted = true; +// } +// +// }; +// } + + @Override + public Publisher add(V e) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e); + } + + @Override + public Publisher removeRandom() { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.SPOP_SINGLE, getName()); + } + + @Override + public Publisher remove(Object o) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.SREM_SINGLE, getName(), o); + } + + @Override + public Publisher containsAll(Collection c) { + return commandExecutor.evalReadObservable(getName(), codec, EVAL_OBJECTS, + "local s = redis.call('smembers', KEYS[1]);" + + "for i = 0, table.getn(s), 1 do " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == s[i] " + + "then table.remove(ARGV, j) end " + + "end; " + + "end;" + + "return table.getn(ARGV) == 0; ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher addAll(Collection c) { + List args = new ArrayList(c.size() + 1); + args.add(getName()); + args.addAll(c); + return commandExecutor.writeObservable(getName(), codec, RedisCommands.SADD, args.toArray()); + } + + @Override + public Publisher retainAll(Collection c) { + return commandExecutor.evalWriteObservable(getName(), codec, EVAL_OBJECTS, + "local changed = false " + + "local s = redis.call('smembers', KEYS[1]) " + + "local i = 0 " + + "while i <= table.getn(s) do " + + "local element = s[i] " + + "local isInAgrs = false " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == element then " + + "isInAgrs = true " + + "break " + + "end " + + "end " + + "if isInAgrs == false then " + + "redis.call('SREM', KEYS[1], element) " + + "changed = true " + + "end " + + "i = i + 1 " + + "end " + + "return changed ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher removeAll(Collection c) { + return commandExecutor.evalWriteObservable(getName(), codec, EVAL_OBJECTS, + "local v = false " + + "for i = 0, table.getn(ARGV), 1 do " + + "if redis.call('srem', KEYS[1], ARGV[i]) == 1 " + + "then v = true end " + +"end " + + "return v ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher iterator() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index ad66044a3..2efc232b3 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -93,14 +93,16 @@ public interface RedisCommands { RedisCommand> EXEC = new RedisCommand>("EXEC", new ObjectListReplayDecoder()); RedisCommand SREM = new RedisCommand("SREM", 2, ValueType.OBJECTS); - RedisCommand SADD = new RedisCommand("SADD", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS); + RedisCommand SADD_BOOL = new RedisCommand("SADD", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS); + RedisStrictCommand SADD = new RedisStrictCommand("SADD", 2, ValueType.OBJECTS); RedisCommand SPOP_SINGLE = new RedisCommand("SPOP"); RedisCommand SADD_SINGLE = new RedisCommand("SADD", new BooleanReplayConvertor(), 2); RedisCommand SREM_SINGLE = new RedisCommand("SREM", new BooleanReplayConvertor(), 2); RedisCommand> SMEMBERS = new RedisCommand>("SMEMBERS", new ObjectListReplayDecoder()); RedisCommand> SSCAN = new RedisCommand>("SSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.OBJECT); RedisCommand SISMEMBER = new RedisCommand("SISMEMBER", new BooleanReplayConvertor(), 2); - RedisStrictCommand SCARD = new RedisStrictCommand("SCARD", new IntegerReplayConvertor()); + RedisStrictCommand SCARD_INT = new RedisStrictCommand("SCARD", new IntegerReplayConvertor()); + RedisStrictCommand SCARD = new RedisStrictCommand("SCARD"); RedisCommand LSET = new RedisCommand("LSET", new VoidReplayConvertor(), 3); RedisCommand LPOP = new RedisCommand("LPOP"); diff --git a/src/main/java/org/redisson/core/RSetReactive.java b/src/main/java/org/redisson/core/RSetReactive.java new file mode 100644 index 000000000..4c7ffbcc9 --- /dev/null +++ b/src/main/java/org/redisson/core/RSetReactive.java @@ -0,0 +1,37 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import org.reactivestreams.Publisher; + +/** + * Async set functions + * + * @author Nikita Koksharov + * + * @param value + */ +public interface RSetReactive extends RCollectionReactive { + + /** + * Removes and returns random element from set + * in async mode + * + * @return + */ + Publisher removeRandom(); + +}