RSetCacheReactive added. #321

pull/337/head
Nikita 9 years ago
parent 44f255c0b0
commit b03a34f20b

@ -345,7 +345,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[2], ARGV[1]); " +
"return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive));
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive));
}
@Override

@ -35,6 +35,7 @@ import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
@ -66,6 +67,7 @@ import org.redisson.reactive.RedissonPatternTopicReactive;
import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonScriptReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive;
@ -245,6 +247,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonDequeReactive<V>(codec, commandExecutor, name);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(evictionScheduler, commandExecutor, name);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, evictionScheduler, commandExecutor, name);
}
@Override
public RAtomicLongReactive getAtomicLong(String name) {
return new RedissonAtomicLongReactive(commandExecutor, name);
@ -293,5 +305,6 @@ public class RedissonReactive implements RedissonReactiveClient {
connectionManager.shutdown();
}
}

@ -105,7 +105,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
stopRefreshTask();
cancelExpirationRenewal();
}
}
@ -115,7 +115,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
}
Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +

@ -498,7 +498,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[2], ARGV[1]); "
+ "return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive));
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive));
}
@Override

@ -107,7 +107,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
stopRefreshTask();
cancelExpirationRenewal();
}
}
@ -117,7 +117,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
}
Future<Boolean> forceUnlockAsync() {
stopRefreshTask();
cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +

@ -33,6 +33,33 @@ import org.redisson.client.codec.Codec;
*/
public interface RBatchReactive {
/**
* Returns set-based cache instance by <code>name</code>.
* Uses map (value_hash, value) under the hood for minimal memory consumption.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCacheReactive<V> getSetCache(String name);
/**
* Returns set-based cache instance by <code>name</code>
* using provided <code>codec</code> for values.
* Uses map (value_hash, value) under the hood for minimal memory consumption.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCacheReactive<V> getSetCache(String name, Codec codec);
/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
@ -175,6 +202,12 @@ public interface RBatchReactive {
*/
RLexSortedSetReactive getLexSortedSet(String name);
/**
* Returns bitSet instance by name.
*
* @param name of bitSet
* @return
*/
RBitSetReactive getBitSet(String name);
/**

@ -0,0 +1,42 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
* Async set functions
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public interface RSetCacheReactive<V> extends RCollectionReactive<V> {
Publisher<Boolean> add(V value, long ttl, TimeUnit unit);
/**
* Returns the number of elements in cache.
* This number can reflects expired elements too
* due to non realtime cleanup process.
*
*/
@Override
Publisher<Long> size();
}

@ -32,6 +32,30 @@ import org.redisson.core.NodesGroup;
*/
public interface RedissonReactiveClient {
/**
* Returns set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCacheReactive<V> getSetCache(String name);
/**
* Returns set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCacheReactive<V> getSetCache(String name, Codec codec);
/**
* Returns map-based cache instance by name
* using provided codec for both cache keys and values.

@ -176,6 +176,7 @@ public interface RedisCommands {
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY);
RedisStrictCommand<Integer> HLEN = new RedisStrictCommand<Integer>("HLEN", new IntegerReplayConvertor());
RedisStrictCommand<Long> HLEN_LONG = new RedisStrictCommand<Long>("HLEN");
RedisCommand<Set<Object>> HKEYS = new RedisCommand<Set<Object>>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
RedisCommand<Void> HMSET = new RedisCommand<Void>("HMSET", new VoidReplayConvertor(), 2, ValueType.MAP);
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder<Object>(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);

@ -17,27 +17,25 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.api.RCollectionReactive;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.action.support.DefaultSubscriber;
public class PublisherAdder<V> {
abstract class RedissonCollectionReactive<V> extends RedissonExpirableReactive {
private final RCollectionReactive<V> destination;
RedissonCollectionReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
public PublisherAdder(RCollectionReactive<V> destination) {
this.destination = destination;
}
RedissonCollectionReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
super(codec, connectionManager, name);
public Long sum(Long first, Long second) {
return first + second;
}
protected Publisher<Long> addAll(Publisher<? extends V> c, final Function<V, Publisher<Long>> function, final BiFunction<Long, Long, Long> sizeFunc) {
public Publisher<Long> addAll(Publisher<? extends V> c) {
final Promise<Long> promise = Promises.prepare();
c.subscribe(new DefaultSubscriber<V>() {
@ -55,7 +53,7 @@ abstract class RedissonCollectionReactive<V> extends RedissonExpirableReactive {
@Override
public void onNext(V o) {
lastValue = o;
function.apply(o).subscribe(new DefaultSubscriber<Long>() {
destination.add(o).subscribe(new DefaultSubscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
@ -69,7 +67,7 @@ abstract class RedissonCollectionReactive<V> extends RedissonExpirableReactive {
@Override
public void onNext(Long o) {
lastSize = sizeFunc.apply(lastSize, o);
lastSize = sum(lastSize, o);
}
@Override

@ -34,6 +34,7 @@ import org.redisson.api.RMapReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.client.codec.Codec;
@ -155,6 +156,16 @@ public class RedissonBatchReactive implements RBatchReactive {
return new RedissonAtomicLongReactive(executorService, name);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
return new RedissonSetCacheReactive<V>(evictionScheduler, executorService, name);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
return new RedissonSetCacheReactive<V>(codec, evictionScheduler, executorService, name);
}
@Override
public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSetReactive<V>(executorService, name);

@ -38,7 +38,7 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen
@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toSeconds(timeToLive));
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toMillis(timeToLive));
}
@Override

@ -25,9 +25,6 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactive<String> implements RLexSortedSetReactive {
public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
@ -36,17 +33,7 @@ public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactiv
@Override
public Publisher<Long> addAll(Publisher<? extends String> c) {
return addAll(c, new Function<String, Publisher<Long>>() {
@Override
public Publisher<Long> apply(String o) {
return add(o);
}
}, new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long left, Long right) {
return left + right;
}
});
return new PublisherAdder<String>(this).addAll(c);
}
@Override

@ -33,8 +33,8 @@ import org.reactivestreams.Subscription;
import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
@ -54,7 +54,7 @@ import reactor.rx.subscription.ReactiveSubscription;
*
* @param <V> the type of elements held in this collection
*/
public class RedissonListReactive<V> extends RedissonCollectionReactive<V> implements RListReactive<V> {
public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> {
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
@ -175,17 +175,14 @@ public class RedissonListReactive<V> extends RedissonCollectionReactive<V> imple
@Override
public Publisher<Long> addAll(Publisher<? extends V> c) {
return addAll(c, new Function<V, Publisher<Long>>() {
@Override
public Publisher<Long> apply(V o) {
return add(o);
}
}, new BiFunction<Long, Long, Long>() {
return new PublisherAdder<V>(this) {
@Override
public Long apply(Long left, Long right) {
return right;
public Long sum(Long first, Long second) {
return second;
}
});
}.addAll(c);
}
@Override
@ -276,10 +273,6 @@ public class RedissonListReactive<V> extends RedissonCollectionReactive<V> imple
return commandExecutor.readReactive(getName(), codec, LINDEX, getName(), index);
}
private boolean isPositionInRange(long index, long size) {
return index >= 0 && index <= size;
}
@Override
public Publisher<V> set(long index, V element) {
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Object>("EVAL", 5),

@ -278,7 +278,6 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
}
});
}
@Override
@ -351,7 +350,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[2], ARGV[1]); " +
"return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive));
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive));
}
@Override

@ -16,14 +16,11 @@
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RObjectReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
import reactor.rx.Streams;

@ -17,29 +17,22 @@ package org.redisson.reactive;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
public class RedissonScoredSortedSetReactive<V> extends RedissonCollectionReactive<V> implements RScoredSortedSetReactive<V> {
public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactive implements RScoredSortedSetReactive<V> {
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
@ -135,107 +128,11 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonCollectionReacti
@Override
public Publisher<V> iterator() {
return new Stream<V>() {
return new SetReactiveIterator<V>() {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new SubscriberBarrier<V, V>(t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private List<V> prevValues = new ArrayList<V>();
@Override
protected void doRequest(long n) {
currentIndex = n;
if (!prevValues.isEmpty()) {
List<V> vals = new ArrayList<V>(prevValues);
prevValues.clear();
handle(vals);
if (currentIndex == 0) {
return;
}
}
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
if (currentIndex > 0) {
onNext(val);
} else {
prevValues.add(val);
}
currentIndex--;
if (currentIndex == 0) {
onComplete();
}
}
}
protected void nextValues() {
final SubscriberBarrier<V, V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};
}

@ -0,0 +1,373 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.EvictionScheduler;
import org.redisson.api.RSetCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandReactiveExecutor;
import net.openhft.hashing.LongHashFunction;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.action.support.DefaultSubscriber;
/**
* <p>Set-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} method.
* And therefore has an complex lua-scripts inside.
* Uses map(value_hash, value) to tie with sorted set which contains expiration record for every value with TTL.
* </p>
*
* <p>Current Redis implementation doesn't have set entry eviction functionality.
* Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs hronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p>
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
private static final RedisCommand<Void> ADD_ALL = new RedisCommand<Void>("HMSET", new VoidReplayConvertor());
private static final RedisCommand<Long> EVAL_ADD = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
private static final RedisStrictCommand<Boolean> HDEL = new RedisStrictCommand<Boolean>("HDEL", new BooleanReplayConvertor());
private final EvictionScheduler evictionScheduler;
public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
}
public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
}
@Override
public Publisher<Long> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HLEN_LONG, getName());
}
private byte[] hash(Object o) {
if (o == null) {
throw new NullPointerException("Value can't be null");
}
try {
byte[] objectState = codec.getValueEncoder().encode(o);
return hash(objectState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] hash(byte[] objectState) {
long h1 = LongHashFunction.farmUo().hashBytes(objectState);
long h2 = LongHashFunction.xx_r39().hashBytes(objectState);
return ByteBuffer.allocate((2 * Long.SIZE) / Byte.SIZE)
.putLong(h1)
.putLong(h2)
.array();
}
String getTimeoutSetName() {
return "redisson__timeout__set__{" + getName() + "}";
}
@Override
public Publisher<Boolean> contains(Object o) {
Promise<Boolean> result = Promises.prepare();
byte[] key = hash(o);
Publisher<List<Object>> future = commandExecutor.evalReadReactive(getName(), codec, EVAL_CONTAINS_KEY,
"local value = redis.call('hexists', KEYS[1], KEYS[3]); " +
"local expireDate = 92233720368547758; " +
"if value == 1 then " +
"local expireDateScore = redis.call('zscore', KEYS[2], KEYS[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; " +
"end;" +
"return {expireDate, value}; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), key));
addExpireListener(result, future, new BooleanReplayConvertor(), false);
return result;
}
private <T> void addExpireListener(final Promise<T> result, Publisher<List<Object>> publisher, final Convertor<T> convertor, final T nullValue) {
publisher.subscribe(new DefaultSubscriber<List<Object>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(List<Object> res) {
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
result.onNext(nullValue);
result.onComplete();
evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return;
}
if (convertor != null) {
result.onNext((T) convertor.convert(res.get(1)));
} else {
result.onNext((T) res.get(1));
}
result.onComplete();
}
@Override
public void onError(Throwable t) {
result.onError(t);
}
});
}
Publisher<ListScanResult<V>> scanIterator(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadReactive(client, getName(), codec, RedisCommands.EVAL_SSCAN,
"local result = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then "
+ "local key = res[2][i-1]; "
+ "local expireDate = redis.call('zscore', KEYS[2], key); "
+ "if (expireDate == false) or (expireDate ~= false and tonumber(expireDate) > tonumber(ARGV[2])) then "
+ "table.insert(result, value); "
+ "end; "
+ "end; "
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName(), getTimeoutSetName()), startPos, System.currentTimeMillis());
}
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
};
}
@Override
public Publisher<Boolean> add(V value, long ttl, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("TimeUnit param can't be null");
}
try {
byte[] objectState = encode(value);
byte[] key = hash(objectState);
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], ARGV[1], KEYS[3]); " +
"if redis.call('hexists', KEYS[1], KEYS[3]) == 0 then " +
"redis.call('hset', KEYS[1], KEYS[3], ARGV[2]); " +
"return 1; " +
"end;" +
"return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), key), timeoutDate, objectState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] encode(V value) throws IOException {
return codec.getValueEncoder().encode(value);
}
@Override
public Publisher<Long> add(V e) {
byte[] key = hash(e);
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_ADD,
"if redis.call('hexists', KEYS[1], KEYS[2]) == 0 then " +
"redis.call('hset', KEYS[1], KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0; ",
Arrays.<Object>asList(getName(), key), e);
}
@Override
public Publisher<Boolean> remove(Object o) {
byte[] key = hash(o);
return commandExecutor.writeReactive(getName(), codec, HDEL, getName(), key);
}
@Override
public Publisher<Boolean> containsAll(Collection<?> c) {
return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] then "
+ "table.remove(ARGV, j) "
+ "end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Publisher<Long> addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return newSucceeded(0L);
}
List<Object> params = new ArrayList<Object>(c.size()*2 + 1);
params.add(getName());
try {
for (V value : c) {
byte[] objectState = encode(value);
byte[] key = hash(objectState);
params.add(key);
params.add(objectState);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return commandExecutor.writeReactive(getName(), codec, ADD_ALL, params.toArray());
}
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
List<byte[]> params = new ArrayList<byte[]>(c.size());
for (Object object : c) {
params.add(hash(object));
}
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local keys = redis.call('hkeys', KEYS[1]); " +
"local i=1;" +
"while i <= #keys do "
+ "local changed = false;"
+ "local element = keys[i];"
+ "for j, argElement in pairs(ARGV) do "
+ "if argElement == element then "
+ "changed = true;"
+ "table.remove(keys, i); "
+ "table.remove(ARGV, j); "
+ "break; "
+ "end; "
+ "end; " +
"if changed == false then " +
"i = i + 1 " +
"end " +
"end " +
"if #keys > 0 then "
+ "for i=1, #keys,5000 do "
+ "redis.call('hdel', KEYS[1], unpack(keys, i, math.min(i+4999, #keys))); "
+ "redis.call('zrem', KEYS[2], unpack(keys, i, math.min(i+4999, #keys))); "
+ "end "
+ "return 1;"
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), params.toArray());
}
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
List<Object> params = new ArrayList<Object>(c.size()+1);
params.add(getName());
for (Object object : c) {
params.add(hash(object));
}
return commandExecutor.writeReactive(getName(), codec, HDEL, params.toArray());
}
@Override
public Publisher<Boolean> delete() {
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName());
}
@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteReactive(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[2], ARGV[1]); "
+ "return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive));
}
@Override
public Publisher<Boolean> expireAt(long timestamp) {
return commandExecutor.evalWriteReactive(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpireat', KEYS[2], ARGV[1]); "
+ "return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timestamp);
}
@Override
public Publisher<Boolean> clearExpire() {
return commandExecutor.evalWriteReactive(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[2]); "
+ "return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()));
}
@Override
public Publisher<Long> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>(this).addAll(c);
}
}

@ -22,19 +22,12 @@ import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.rx.Stream;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
@ -42,7 +35,7 @@ import reactor.rx.Stream;
*
* @param <V> value
*/
public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implements RSetReactive<V> {
public class RedissonSetReactive<V> extends RedissonExpirableReactive implements RSetReactive<V> {
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
@ -54,17 +47,7 @@ public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implem
@Override
public Publisher<Long> addAll(Publisher<? extends V> c) {
return addAll(c, new Function<V, Publisher<Long>>() {
@Override
public Publisher<Long> apply(V o) {
return add(o);
}
}, new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long left, Long right) {
return left + right;
}
});
return new PublisherAdder<V>(this).addAll(c);
}
@Override
@ -157,107 +140,11 @@ public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implem
@Override
public Publisher<V> iterator() {
return new Stream<V>() {
return new SetReactiveIterator<V>() {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new SubscriberBarrier<V, V>(t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private List<V> prevValues = new ArrayList<V>();
@Override
protected void doRequest(long n) {
currentIndex = n;
if (!prevValues.isEmpty()) {
List<V> vals = new ArrayList<V>(prevValues);
prevValues.clear();
handle(vals);
if (currentIndex == 0) {
return;
}
}
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
if (currentIndex > 0) {
onNext(val);
} else {
prevValues.add(val);
}
currentIndex--;
if (currentIndex == 0) {
onComplete();
}
}
}
protected void nextValues() {
final SubscriberBarrier<V, V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};
}

@ -0,0 +1,133 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
public abstract class SetReactiveIterator<V> extends Stream<V> {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new SubscriberBarrier<V, V>(t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private List<V> prevValues = new ArrayList<V>();
@Override
protected void doRequest(long n) {
currentIndex = n;
if (!prevValues.isEmpty()) {
List<V> vals = new ArrayList<V>(prevValues);
prevValues.clear();
handle(vals);
if (currentIndex == 0) {
return;
}
}
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
if (currentIndex > 0) {
onNext(val);
} else {
prevValues.add(val);
}
currentIndex--;
if (currentIndex == 0) {
onComplete();
}
}
}
protected void nextValues() {
final SubscriberBarrier<V, V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
}
protected abstract Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos);
}
Loading…
Cancel
Save