Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 73482110e0

@ -0,0 +1,114 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
/**
*
* @author Nikita Koksharov
*
* @param <E> entry type
* @param <V> value type
*/
public abstract class BaseIterator<V, E> implements Iterator<V> {
private Iterator<E> lastIter;
protected long nextIterPos;
protected RedisClient client;
private boolean finished;
private boolean currentElementRemoved;
protected E value;
@Override
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
currentElementRemoved = false;
client = null;
nextIterPos = 0;
if (!tryAgain()) {
return false;
}
finished = false;
}
do {
ScanResult<E> res = iterator(client, nextIterPos);
client = res.getRedisClient();
if (res.getPos() == 0) {
finished = true;
if (res.getValues().isEmpty()) {
currentElementRemoved = false;
client = null;
nextIterPos = 0;
if (tryAgain()) {
continue;
}
return false;
}
}
lastIter = res.getValues().iterator();
nextIterPos = res.getPos();
} while (!lastIter.hasNext());
}
return lastIter.hasNext();
}
protected boolean tryAgain() {
return false;
}
protected abstract ScanResult<E> iterator(RedisClient client, long nextIterPos);
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element");
}
value = lastIter.next();
currentElementRemoved = false;
return getValue(value);
}
protected abstract V getValue(E entry);
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (lastIter == null || value == null) {
throw new IllegalStateException();
}
lastIter.remove();
remove(value);
currentElementRemoved = true;
}
protected abstract void remove(E value);
}

@ -15,15 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
/** /**
* *
@ -31,134 +23,11 @@ import org.redisson.misc.HashValue;
* *
* @param <V> value type * @param <V> value type
*/ */
abstract class RedissonBaseIterator<V> implements Iterator<V> { abstract class RedissonBaseIterator<V> extends BaseIterator<V, ScanObjectEntry> {
private List<HashValue> firstValues;
private List<HashValue> lastValues;
private Iterator<ScanObjectEntry> lastIter;
protected long nextIterPos;
protected RedisClient client;
private boolean finished;
private boolean currentElementRemoved;
private V value;
@Override
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
currentElementRemoved = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
if (!tryAgain()) {
return false;
}
finished = false;
}
do {
ListScanResult<ScanObjectEntry> res = iterator(client, nextIterPos);
lastValues = convert(res.getValues());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (isEmpty(firstValues) && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
}
} else {
if (isEmpty(firstValues)) {
firstValues = lastValues;
lastValues = null;
if (isEmpty(firstValues)) {
if (tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
continue;
}
if (res.getPos() == 0) {
finished = true;
return false;
}
}
} else if (removeAll(lastValues, firstValues)
|| (isEmpty(lastValues) && nextIterPos == 0)) {
currentElementRemoved = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
if (tryAgain()) {
continue;
}
finished = true;
return false;
}
}
lastIter = res.getValues().iterator();
nextIterPos = res.getPos();
} while (!lastIter.hasNext());
}
return lastIter.hasNext();
}
protected boolean isEmpty(List<HashValue> values) {
return values.isEmpty();
}
protected boolean removeAll(List<HashValue> lastValues, List<HashValue> firstValues) {
return lastValues.removeAll(firstValues);
}
protected List<HashValue> convert(List<ScanObjectEntry> list) {
List<HashValue> result = new ArrayList<HashValue>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getHash());
}
return result;
}
protected boolean tryAgain() {
return false;
}
abstract ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos);
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element");
}
value = (V) lastIter.next().getObj();
currentElementRemoved = false;
return value;
}
@Override @Override
public void remove() { protected V getValue(ScanObjectEntry entry) {
if (currentElementRemoved) { return (V) entry.getObj();
throw new IllegalStateException("Element been already deleted");
}
if (lastIter == null) {
throw new IllegalStateException();
} }
firstValues.remove(value);
lastIter.remove();
remove(value);
currentElementRemoved = true;
}
abstract void remove(V value);
} }

@ -16,161 +16,31 @@
package org.redisson; package org.redisson;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
* @param <K> key type
* @param <V> value type * @param <V> value type
* @param <M> loaded value type
*/ */
public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> { public abstract class RedissonBaseMapIterator<V> extends BaseIterator<V, Map.Entry<ScanObjectEntry, ScanObjectEntry>> {
private Map<HashValue, HashValue> firstValues;
private Map<HashValue, HashValue> lastValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter;
protected long nextIterPos;
protected RedisClient client;
private boolean finished;
private boolean currentElementRemoved;
protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
@Override
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
currentElementRemoved = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
if (!tryAgain()) {
return false;
}
finished = false;
}
do {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
lastValues = convert(res.getMap());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
}
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
if (tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
continue;
}
if (res.getPos() == 0) {
finished = true;
return false;
}
}
} else if (lastValues.keySet().removeAll(firstValues.keySet())
|| (lastValues.isEmpty() && nextIterPos == 0)) {
currentElementRemoved = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
if (tryAgain()) {
continue;
}
finished = true;
return false;
}
}
lastIter = res.getMap().entrySet().iterator();
nextIterPos = res.getPos();
} while (!lastIter.hasNext());
}
return lastIter.hasNext();
}
protected boolean tryAgain() {
return false;
}
protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator();
private Map<HashValue, HashValue> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<HashValue, HashValue> result = new HashMap<HashValue, HashValue>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getHash(), entry.getValue().getHash());
}
return result;
}
@Override
public M next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
entry = lastIter.next();
currentElementRemoved = false;
return getValue(entry);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) { protected V getValue(final Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) { return (V)new AbstractMap.SimpleEntry(entry.getKey().getObj(), entry.getValue().getObj()) {
@Override @Override
public V setValue(V value) { public Object setValue(Object value) {
return put(entry, value); return put(entry, value);
} }
}; };
} }
@Override protected abstract Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value);
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (lastIter == null || entry == null) {
throw new IllegalStateException();
}
firstValues.remove(entry.getKey().getHash());
lastIter.remove();
removeKey();
currentElementRemoved = true;
entry = null;
}
protected abstract void removeKey();
protected abstract V put(Entry<ScanObjectEntry, ScanObjectEntry> entry, V value);
} }

@ -199,6 +199,10 @@ public class RedissonExecutorService implements RScheduledExecutorService {
public int countActiveWorkers() { public int countActiveWorkers() {
String id = generateRequestId(); String id = generateRequestId();
int subscribers = (int) workersTopic.publish(id); int subscribers = (int) workersTopic.publish(id);
if (subscribers == 0) {
return 0;
}
RSemaphore semaphore = redisson.getSemaphore(workersSemaphoreName + ":" + id); RSemaphore semaphore = redisson.getSemaphore(workersSemaphoreName + ":" + id);
try { try {
semaphore.tryAcquire(subscribers, 10, TimeUnit.MINUTES); semaphore.tryAcquire(subscribers, 10, TimeUnit.MINUTES);

@ -121,13 +121,13 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() { return new RedissonBaseIterator<String>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
} }
@Override @Override
void remove(String value) { protected void remove(ScanObjectEntry value) {
RedissonKeys.this.delete(value); RedissonKeys.this.delete((String)value.getObj());
} }
}; };

@ -1102,7 +1102,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
protected Iterator<K> keyIterator(String pattern) { protected Iterator<K> keyIterator(String pattern) {
return new RedissonMapIterator<K, V, K>(RedissonMap.this, pattern) { return new RedissonMapIterator<K>(RedissonMap.this, pattern) {
@Override @Override
protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) { protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj(); return (K) entry.getKey().getObj();
@ -1153,7 +1153,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
protected Iterator<V> valueIterator(String pattern) { protected Iterator<V> valueIterator(String pattern) {
return new RedissonMapIterator<K, V, V>(RedissonMap.this, pattern) { return new RedissonMapIterator<V>(RedissonMap.this, pattern) {
@Override @Override
protected V getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) { protected V getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj(); return (V) entry.getValue().getObj();
@ -1200,7 +1200,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
protected Iterator<Map.Entry<K,V>> entryIterator(String pattern) { protected Iterator<Map.Entry<K,V>> entryIterator(String pattern) {
return new RedissonMapIterator<K, V, Map.Entry<K, V>>(RedissonMap.this, pattern); return new RedissonMapIterator<Map.Entry<K, V>>(RedissonMap.this, pattern);
} }
private void loadValue(final K key, final RPromise<V> result, final boolean replaceValue) { private void loadValue(final K key, final RPromise<V> result, final boolean replaceValue) {

@ -17,37 +17,38 @@ package org.redisson;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
* @param <K> key type
* @param <V> value type
* @param <M> loaded value type * @param <M> loaded value type
*/ */
public class RedissonMapIterator<K, V, M> extends RedissonBaseMapIterator<K, V, M> { public class RedissonMapIterator<M> extends RedissonBaseMapIterator<M> {
private final RedissonMap<K, V> map; private final RedissonMap map;
private final String pattern; private final String pattern;
public RedissonMapIterator(RedissonMap<K, V> map, String pattern) { public RedissonMapIterator(RedissonMap map, String pattern) {
this.map = map; this.map = map;
this.pattern = pattern; this.pattern = pattern;
} }
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() { @Override
return map.scanIterator(map.getName(), client, nextIterPos, pattern); protected Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
return map.put(entry.getKey().getObj(), value);
} }
protected void removeKey() { @Override
map.fastRemove((K)entry.getKey().getObj()); protected ScanResult<Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(map.getName(), client, nextIterPos, pattern);
} }
protected V put(Entry<ScanObjectEntry, ScanObjectEntry> entry, V value) { @Override
return map.put((K) entry.getKey().getObj(), value); protected void remove(Entry<ScanObjectEntry, ScanObjectEntry> value) {
map.fastRemove(value.getKey().getObj());
} }
} }

@ -17,28 +17,35 @@ package org.redisson;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
public class RedissonMultiMapKeysIterator<K, V, M> extends RedissonBaseMapIterator<K, V, M> { /**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonMultiMapKeysIterator<V> extends RedissonBaseMapIterator<V> {
private final RedissonMultimap<K, V> map; private final RedissonMultimap map;
public RedissonMultiMapKeysIterator(RedissonMultimap<K, V> map) { public RedissonMultiMapKeysIterator(RedissonMultimap map) {
this.map = map; this.map = map;
} }
@Override
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() { protected Object put(Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
return map.scanIterator(client, nextIterPos); return map.put(entry.getKey().getObj(), value);
} }
protected void removeKey() { @Override
map.fastRemove((K)entry.getKey().getObj()); protected ScanResult<Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(client, nextIterPos);
} }
protected V put(Entry<ScanObjectEntry, ScanObjectEntry> entry, V value) { @Override
map.put((K) entry.getKey().getObj(), value); protected void remove(Entry<ScanObjectEntry, ScanObjectEntry> value) {
return null; map.fastRemove(value.getKey().getObj());
} }
} }

@ -318,7 +318,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
@Override @Override
public Iterator<K> iterator() { public Iterator<K> iterator() {
return new RedissonMultiMapKeysIterator<K, V, K>(RedissonMultimap.this) { return new RedissonMultiMapKeysIterator<K>(RedissonMultimap.this) {
@Override @Override
protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) { protected K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj(); return (K) entry.getKey().getObj();

@ -322,13 +322,13 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos); return scanIterator(client, nextIterPos);
} }
@Override @Override
void remove(V value) { protected void remove(ScanObjectEntry value) {
RedissonScoredSortedSet.this.remove(value); RedissonScoredSortedSet.this.remove((V)value.getObj());
} }
}; };

@ -288,6 +288,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
if (permits < 0) { if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative"); throw new IllegalArgumentException("Permits amount can't be negative");
} }
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " + "local value = redis.call('get', KEYS[1]); " +
@ -469,6 +472,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
if (permits < 0) { if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative"); throw new IllegalArgumentException("Permits amount can't be negative");
} }
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " + "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +

@ -109,13 +109,13 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern); return scanIterator(getName(), client, nextIterPos, pattern);
} }
@Override @Override
void remove(V value) { protected void remove(ScanObjectEntry value) {
RedissonSet.this.remove(value); RedissonSet.this.remove((V)value.getObj());
} }
}; };

@ -161,13 +161,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern); return scanIterator(getName(), client, nextIterPos, pattern);
} }
@Override @Override
void remove(V value) { protected void remove(ScanObjectEntry value) {
RedissonSetCache.this.remove(value); RedissonSetCache.this.remove((V)value.getObj());
} }
}; };

@ -203,13 +203,13 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern); return scanIterator(client, nextIterPos, pattern);
} }
@Override @Override
void remove(V value) { protected void remove(ScanObjectEntry value) {
RedissonSetMultimapValues.this.remove(value); RedissonSetMultimapValues.this.remove((V)value.getObj());
} }
}; };

@ -15,6 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import java.util.Collection;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
/** /**
@ -22,10 +24,14 @@ import org.redisson.client.RedisClient;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public interface RedisClientResult { public interface ScanResult<R> {
void setRedisClient(RedisClient addr); void setRedisClient(RedisClient addr);
RedisClient getRedisClient(); RedisClient getRedisClient();
long getPos();
Collection<R> getValues();
} }

@ -15,8 +15,10 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -45,6 +47,7 @@ import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders; import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.NetUtil;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -166,6 +169,17 @@ public class RedisClient {
return resolvedAddrFuture.get(); return resolvedAddrFuture.get();
} }
byte[] addr = NetUtil.createByteArrayFromIpAddressString(uri.getHost());
if (addr != null) {
try {
resolvedAddr = new InetSocketAddress(InetAddress.getByAddress(uri.getHost(), addr), uri.getPort());
} catch (UnknownHostException e) {
// skip
}
promise.trySuccess(resolvedAddr);
return promise;
}
AddressResolver<InetSocketAddress> resolver = (AddressResolver<InetSocketAddress>) bootstrap.config().resolver().getResolver(bootstrap.config().group().next()); AddressResolver<InetSocketAddress> resolver = (AddressResolver<InetSocketAddress>) bootstrap.config().resolver().getResolver(bootstrap.config().group().next());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() { resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@ -176,13 +190,24 @@ public class RedisClient {
return; return;
} }
resolvedAddr = future.getNow(); InetSocketAddress resolved = future.getNow();
promise.trySuccess(future.getNow()); resolvedAddr = createInetSocketAddress(resolved, uri.getHost());
promise.trySuccess(resolvedAddr);
} }
}); });
return promise; return promise;
} }
private InetSocketAddress createInetSocketAddress(InetSocketAddress resolved, String host) {
byte[] addr = NetUtil.createByteArrayFromIpAddressString(resolved.getAddress().getHostAddress());
try {
return new InetSocketAddress(InetAddress.getByAddress(host, addr), resolved.getPort());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
public RFuture<RedisConnection> connectAsync() { public RFuture<RedisConnection> connectAsync() {
final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>(); final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();

@ -33,7 +33,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.config.SslProvider; import org.redisson.config.SslProvider;
import org.redisson.misc.URIBuilder;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -46,6 +45,7 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.NetUtil;
/** /**
* *
@ -164,7 +164,7 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
SslContext sslContext = sslContextBuilder.build(); SslContext sslContext = sslContextBuilder.build();
String hostname = config.getSslHostname(); String hostname = config.getSslHostname();
if (hostname == null || URIBuilder.isValidIP(hostname)) { if (hostname == null || NetUtil.createByteArrayFromIpAddressString(hostname) != null) {
hostname = config.getAddress().getHost(); hostname = config.getAddress().getHost();
} }

@ -15,10 +15,9 @@
*/ */
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.redisson.RedisClientResult; import org.redisson.ScanResult;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
/** /**
@ -27,21 +26,23 @@ import org.redisson.client.RedisClient;
* *
* @param <V> value type * @param <V> value type
*/ */
public class ListScanResult<V> implements RedisClientResult { public class ListScanResult<V> implements ScanResult<V> {
private final Long pos; private final long pos;
private final List<V> values; private final List<V> values;
private RedisClient client; private RedisClient client;
public ListScanResult(Long pos, List<V> values) { public ListScanResult(long pos, List<V> values) {
this.pos = pos; this.pos = pos;
this.values = values; this.values = values;
} }
public Long getPos() { @Override
public long getPos() {
return pos; return pos;
} }
@Override
public List<V> getValues() { public List<V> getValues() {
return values; return values;
} }
@ -51,6 +52,7 @@ public class ListScanResult<V> implements RedisClientResult {
this.client = client; this.client = client;
} }
@Override
public RedisClient getRedisClient() { public RedisClient getRedisClient() {
return client; return client;
} }

@ -15,10 +15,10 @@
*/ */
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;
import java.net.InetSocketAddress; import java.util.Collection;
import java.util.Map; import java.util.Map;
import org.redisson.RedisClientResult; import org.redisson.ScanResult;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
/** /**
@ -28,31 +28,38 @@ import org.redisson.client.RedisClient;
* @param <K> key type * @param <K> key type
* @param <V> value type * @param <V> value type
*/ */
public class MapScanResult<K, V> implements RedisClientResult { public class MapScanResult<K, V> implements ScanResult<Map.Entry<K, V>> {
private final Long pos; private final long pos;
private final Map<K, V> values; private final Map<K, V> values;
private RedisClient client; private RedisClient client;
public MapScanResult(Long pos, Map<K, V> values) { public MapScanResult(long pos, Map<K, V> values) {
super(); super();
this.pos = pos; this.pos = pos;
this.values = values; this.values = values;
} }
public Long getPos() { @Override
return pos; public Collection<Map.Entry<K, V>> getValues() {
return values.entrySet();
} }
public Map<K, V> getMap() { public Map<K, V> getMap() {
return values; return values;
} }
@Override
public long getPos() {
return pos;
}
@Override @Override
public void setRedisClient(RedisClient client) { public void setRedisClient(RedisClient client) {
this.client = client; this.client = client;
} }
@Override
public RedisClient getRedisClient() { public RedisClient getRedisClient() {
return client; return client;
} }

@ -29,7 +29,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedisClientResult; import org.redisson.ScanResult;
import org.redisson.RedissonReference; import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
@ -856,8 +856,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.isSuccess()) { if (future.isSuccess()) {
R res = future.getNow(); R res = future.getNow();
if (res instanceof RedisClientResult) { if (res instanceof ScanResult) {
((RedisClientResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient()); ((ScanResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
} }
if (isRedissonReferenceSupportEnabled()) { if (isRedissonReferenceSupportEnabled()) {

@ -52,6 +52,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.RedissonBaseMapIterator; import org.redisson.RedissonBaseMapIterator;
import org.redisson.RedissonObject; import org.redisson.RedissonObject;
import org.redisson.ScanResult;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RSemaphore; import org.redisson.api.RSemaphore;
@ -2094,25 +2095,26 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
} }
protected Iterator<K> keyIterator() { protected Iterator<K> keyIterator() {
return new RedissonBaseMapIterator<K, V, K>() { return new RedissonBaseMapIterator<K>() {
@Override @Override
protected K getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) { protected K getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj(); return (K) entry.getKey().getObj();
} }
@Override @Override
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() { protected void remove(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> value) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos); throw new UnsupportedOperationException();
} }
@Override @Override
protected void removeKey() { protected Object put(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
protected V put(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, V value) { protected ScanResult<java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client,
throw new UnsupportedOperationException(); long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos);
} }
}; };
} }
@ -2412,7 +2414,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
@Override @Override
public Iterator<javax.cache.Cache.Entry<K, V>> iterator() { public Iterator<javax.cache.Cache.Entry<K, V>> iterator() {
checkNotClosed(); checkNotClosed();
return new RedissonBaseMapIterator<K, V, javax.cache.Cache.Entry<K, V>>() { return new RedissonBaseMapIterator<javax.cache.Cache.Entry<K, V>>() {
@Override @Override
protected Cache.Entry<K, V> getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) { protected Cache.Entry<K, V> getValue(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
cacheManager.getStatBean(JCache.this).addHits(1); cacheManager.getStatBean(JCache.this).addHits(1);
@ -2427,19 +2429,23 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
} }
@Override @Override
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() { protected void remove(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos); JCache.this.remove((K) entry.getKey().getObj());
} }
@Override @Override
protected void removeKey() { protected Object put(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, Object value) {
JCache.this.remove((K) entry.getKey().getObj()); throw new UnsupportedOperationException();
} }
@Override @Override
protected V put(Map.Entry<ScanObjectEntry, ScanObjectEntry> entry, V value) { protected ScanResult<java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry>> iterator(RedisClient client,
throw new UnsupportedOperationException(); long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getName(), client, nextIterPos);
} }
}; };
} }

@ -18,9 +18,12 @@ package org.redisson.misc;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.regex.Pattern; import java.net.UnknownHostException;
import io.netty.util.NetUtil;
/** /**
* *
@ -28,9 +31,6 @@ import java.util.regex.Pattern;
*/ */
public class URIBuilder { public class URIBuilder {
private static final Pattern ipv4Pattern = Pattern.compile("(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])", Pattern.CASE_INSENSITIVE);
private static final Pattern ipv6Pattern = Pattern.compile("([0-9a-f]{1,4}:){7}([0-9a-f]){1,4}", Pattern.CASE_INSENSITIVE);
public static URI create(String uri) { public static URI create(String uri) {
URI u = URI.create(uri); URI u = URI.create(uri);
// Let's assuming most of the time it is OK. // Let's assuming most of the time it is OK.
@ -68,15 +68,11 @@ public class URIBuilder {
} }
} }
public static boolean isValidIP(String host) { public static boolean compare(InetSocketAddress entryAddr, URI addr) {
if (ipv4Pattern.matcher(host).matches()) { if (addr.getHost().equals("localhost")) {
return true; System.out.println("host to compare: " + addr.getHost());
}
return ipv6Pattern.matcher(host).matches();
} }
public static boolean compare(InetSocketAddress entryAddr, URI addr) {
if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost())) if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost()))
|| entryAddr.getAddress().getHostAddress().equals(addr.getHost())) || entryAddr.getAddress().getHostAddress().equals(addr.getHost()))
&& entryAddr.getPort() == addr.getPort()) { && entryAddr.getPort() == addr.getPort()) {

@ -1,7 +1,8 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -9,9 +10,14 @@ import java.util.Set;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RType; import org.redisson.api.RType;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonKeysTest extends BaseTest { public class RedissonKeysTest extends BaseTest {
@ -46,6 +52,67 @@ public class RedissonKeysTest extends BaseTest {
assertThat(redisson.getKeys().getType("test1")).isNull(); assertThat(redisson.getKeys().getType("test1")).isNull();
} }
@Test
public void testEmptyKeys() {
Iterable<String> keysIterator = redisson.getKeys().getKeysByPattern("test*", 10);
assertThat(keysIterator.iterator().hasNext()).isFalse();
}
@Test
public void testKeysByPattern() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
int size = 10000;
for (int i = 0; i < size; i++) {
redisson.getBucket("test" + i).set(i);
}
assertThat(redisson.getKeys().count()).isEqualTo(size);
Long noOfKeysDeleted = 0L;
int chunkSize = 20;
Iterable<String> keysIterator = redisson.getKeys().getKeysByPattern("test*", chunkSize);
Set<String> keys = new HashSet<>();
for (String key : keysIterator) {
keys.add(key);
if (keys.size() % chunkSize == 0) {
long res = redisson.getKeys().delete(keys.toArray(new String[keys.size()]));
assertThat(res).isEqualTo(chunkSize);
noOfKeysDeleted += res;
keys.clear();
}
}
//Delete remaining keys
if (!keys.isEmpty()) {
noOfKeysDeleted += redisson.getKeys().delete(keys.toArray(new String[keys.size()]));
}
assertThat(noOfKeysDeleted).isEqualTo(size);
redisson.shutdown();
process.shutdown();
}
@Test @Test
public void testKeysIterablePattern() { public void testKeysIterablePattern() {
redisson.getBucket("test1").set("someValue"); redisson.getBucket("test1").set("someValue");

@ -2,17 +2,22 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.RSemaphore; import org.redisson.api.RSemaphore;
public class RedissonSemaphoreTest extends BaseConcurrentTest { public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testZero() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
assertThat(s.tryAcquire(0, 10, TimeUnit.MINUTES)).isTrue();
s.release(0);
assertThat(s.availablePermits()).isZero();
}
@Test @Test
public void testAcquireWithoutSetPermits() throws InterruptedException { public void testAcquireWithoutSetPermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test"); RSemaphore s = redisson.getSemaphore("test");

@ -116,7 +116,7 @@ public class RedissonTest {
RedissonBaseIterator iter = new RedissonBaseIterator() { RedissonBaseIterator iter = new RedissonBaseIterator() {
int i; int i;
@Override @Override
ListScanResult iterator(RedisClient client, long nextIterPos) { protected ListScanResult iterator(RedisClient client, long nextIterPos) {
i++; i++;
if (i == 1) { if (i == 1) {
return new ListScanResult(13L, Collections.emptyList()); return new ListScanResult(13L, Collections.emptyList());
@ -129,7 +129,7 @@ public class RedissonTest {
} }
@Override @Override
void remove(Object value) { protected void remove(Object value) {
} }
}; };
@ -142,7 +142,7 @@ public class RedissonTest {
RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() { RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() {
int i; int i;
@Override @Override
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
i++; i++;
if (i == 1) { if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1))); return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1)));
@ -161,7 +161,7 @@ public class RedissonTest {
} }
@Override @Override
void remove(Integer value) { protected void remove(ScanObjectEntry value) {
} }
}; };

Loading…
Cancel
Save