Removed ByteBuf usage during collection iteration

pull/1253/merge
Nikita 7 years ago
parent dfa3b4c2dd
commit 5588748cde

@ -23,8 +23,7 @@ import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import org.redisson.misc.HashValue;
/**
*
@ -34,8 +33,8 @@ import io.netty.buffer.ByteBuf;
*/
abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private List<HashValue> firstValues;
private List<HashValue> lastValues;
private Iterator<ScanObjectEntry> lastIter;
protected long nextIterPos;
protected RedisClient client;
@ -48,9 +47,6 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
client = null;
firstValues = null;
@ -64,9 +60,6 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
}
do {
ListScanResult<ScanObjectEntry> res = iterator(client, nextIterPos);
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getValues());
client = res.getRedisClient();
@ -74,16 +67,16 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
if (isEmpty(firstValues) && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
}
} else {
if (firstValues.isEmpty()) {
if (isEmpty(firstValues)) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
if (isEmpty(firstValues)) {
if (tryAgain()) {
client = null;
firstValues = null;
@ -91,18 +84,12 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
continue;
}
if (res.getPos() == 0) {
free(firstValues);
free(lastValues);
finished = true;
return false;
}
}
} else if (lastValues.removeAll(firstValues)
|| (lastValues.isEmpty() && nextIterPos == 0)) {
free(firstValues);
free(lastValues);
} else if (removeAll(lastValues, firstValues)
|| (isEmpty(lastValues) && nextIterPos == 0)) {
currentElementRemoved = false;
client = null;
@ -124,21 +111,20 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
return lastIter.hasNext();
}
private List<ByteBuf> convert(List<ScanObjectEntry> list) {
List<ByteBuf> result = new ArrayList<ByteBuf>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getBuf());
}
return result;
protected boolean isEmpty(List<HashValue> values) {
return values.isEmpty();
}
private void free(List<ByteBuf> list) {
if (list == null) {
return;
}
for (ByteBuf byteBuf : list) {
byteBuf.release();
protected boolean removeAll(List<HashValue> lastValues, List<HashValue> firstValues) {
return lastValues.removeAll(firstValues);
}
protected List<HashValue> convert(List<ScanObjectEntry> list) {
List<HashValue> result = new ArrayList<HashValue>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getHash());
}
return result;
}
protected boolean tryAgain() {

@ -25,8 +25,7 @@ import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import org.redisson.misc.HashValue;
/**
*
@ -38,8 +37,8 @@ import io.netty.buffer.ByteBuf;
*/
public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues;
private Map<ByteBuf, ByteBuf> lastValues;
private Map<HashValue, HashValue> firstValues;
private Map<HashValue, HashValue> lastValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter;
protected long nextIterPos;
protected RedisClient client;
@ -52,9 +51,6 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
client = null;
firstValues = null;
@ -68,9 +64,6 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
}
do {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getMap());
client = res.getRedisClient();
@ -95,18 +88,12 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
continue;
}
if (res.getPos() == 0) {
free(firstValues);
free(lastValues);
finished = true;
return false;
}
}
} else if (lastValues.keySet().removeAll(firstValues.keySet())
|| (lastValues.isEmpty() && nextIterPos == 0)) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
client = null;
@ -135,20 +122,10 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator();
private void free(Map<ByteBuf, ByteBuf> map) {
if (map == null) {
return;
}
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
private Map<HashValue, HashValue> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<HashValue, HashValue> result = new HashMap<HashValue, HashValue>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
result.put(entry.getKey().getHash(), entry.getValue().getHash());
}
return result;
}
@ -185,7 +162,7 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
throw new IllegalStateException();
}
firstValues.remove(entry.getKey().getBuf());
firstValues.remove(entry.getKey().getHash());
lastIter.remove();
removeKey();
currentElementRemoved = true;

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
@ -28,12 +27,19 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
* @param <M> map type
*/
abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstKeys;
private Map<HashValue, HashValue> firstKeys;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> keysIter;
protected long keysIterPos = 0;
@ -76,15 +82,12 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
if (keysIterPos == 0 && firstKeys == null) {
firstKeys = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
Map<HashValue, HashValue> newValues = convert(res.getMap());
if (newValues.equals(firstKeys)) {
finished = true;
free(firstKeys);
free(newValues);
firstKeys = null;
return false;
}
free(newValues);
}
keysIter = res.getMap().entrySet().iterator();
keysIterPos = res.getPos();
@ -107,17 +110,10 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
protected abstract Iterator<V> getIterator(String name);
private void free(Map<ByteBuf, ByteBuf> map) {
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
private Map<HashValue, HashValue> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<HashValue, HashValue> result = new HashMap<HashValue, HashValue>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
result.put(entry.getKey().getHash(), entry.getValue().getHash());
}
return result;
}

@ -21,9 +21,10 @@ import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
*
@ -59,13 +60,15 @@ public class MapScanCodec implements Codec {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}
@ -85,9 +88,11 @@ public class MapScanCodec implements Codec {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}

@ -21,9 +21,10 @@ import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
*
@ -43,9 +44,11 @@ public class ScanCodec implements Codec {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
buf.markReaderIndex();
long[] hash = Hash.hash128(buf);
buf.resetReaderIndex();
Object val = delegate.getValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
return new ScanObjectEntry(new HashValue(hash), val);
}
};
}

@ -15,7 +15,7 @@
*/
package org.redisson.client.protocol.decoder;
import io.netty.buffer.ByteBuf;
import org.redisson.misc.HashValue;
/**
*
@ -24,16 +24,16 @@ import io.netty.buffer.ByteBuf;
*/
public class ScanObjectEntry {
private final ByteBuf buf;
private final HashValue hash;
private final Object obj;
public ScanObjectEntry(ByteBuf buf, Object obj) {
this.buf = buf;
public ScanObjectEntry(HashValue hash, Object obj) {
this.hash = hash;
this.obj = obj;
}
public ByteBuf getBuf() {
return buf;
public HashValue getHash() {
return hash;
}
public Object getObj() {

@ -986,7 +986,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else if (o instanceof ScanObjectEntry) {
ScanObjectEntry keyScan = (ScanObjectEntry) o;
Object obj = tryHandleReference0(keyScan.getObj());
return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getBuf(), obj) : o;
return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getHash(), obj) : o;
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = tryHandleReference0(old.getKey());

@ -0,0 +1,59 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.Arrays;
/**
*
* @author Nikita Koksharov
*
*/
public class HashValue {
private final long[] value;
public HashValue(long[] hash) {
this.value = hash;
}
public long[] getValue() {
return value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(value);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
HashValue other = (HashValue) obj;
if (!Arrays.equals(value, other.value))
return false;
return true;
}
}

@ -26,8 +26,8 @@ import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -54,7 +54,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
public void subscribe(final Subscriber<? super M> t) {
t.onSubscribe(new ReactiveSubscription<M>(this, t) {
private Map<ByteBuf, ByteBuf> firstValues;
private Map<HashValue, HashValue> firstValues;
private long iterPos = 0;
private RedisClient client;
@ -66,10 +66,10 @@ public class RedissonMapReactiveIterator<K, V, M> {
nextValues();
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
private Map<HashValue, HashValue> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<HashValue, HashValue> result = new HashMap<HashValue, HashValue>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
result.put(entry.getKey().getHash(), entry.getValue().getHash());
}
return result;
}
@ -83,23 +83,12 @@ public class RedissonMapReactiveIterator<K, V, M> {
s.request(Long.MAX_VALUE);
}
private void free(Map<ByteBuf, ByteBuf> map) {
if (map == null) {
return;
}
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
}
@Override
public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) {
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
} else if (convert(res.getMap()).equals(firstValues)) {
free(firstValues);
m.onComplete();
currentIndex = 0;
return;

@ -15,7 +15,6 @@
*/
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -25,8 +24,8 @@ import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
import io.netty.buffer.ByteBuf;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -42,8 +41,8 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private List<HashValue> firstValues;
private List<HashValue> lastValues;
private long nextIterPos;
private RedisClient client;
@ -72,9 +71,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
@Override
public void onNext(ListScanResult<ScanObjectEntry> res) {
if (finished) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
@ -83,9 +79,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
long prevIterPos = nextIterPos;
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getValues());
client = res.getRedisClient();
@ -111,9 +104,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
}
} else if (lastValues.removeAll(firstValues)) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
@ -152,19 +142,10 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
});
}
private void free(List<ByteBuf> list) {
if (list == null) {
return;
}
for (ByteBuf byteBuf : list) {
byteBuf.release();
}
}
private List<ByteBuf> convert(List<ScanObjectEntry> list) {
List<ByteBuf> result = new ArrayList<ByteBuf>(list.size());
private List<HashValue> convert(List<ScanObjectEntry> list) {
List<HashValue> result = new ArrayList<HashValue>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getBuf());
result.add(entry.getHash());
}
return result;
}

@ -39,6 +39,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.connection.ConnectionListener;
import org.redisson.misc.HashValue;
import io.netty.buffer.Unpooled;
@ -114,7 +115,7 @@ public class RedissonTest {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1)));
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1)));
}
if (i == 2) {
return new ListScanResult(7L, Collections.emptyList());

Loading…
Cancel
Save