From 5588748cde615959d944cc5b195e84b0225a2c33 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 21 Feb 2018 08:36:20 +0300 Subject: [PATCH] Removed ByteBuf usage during collection iteration --- .../org/redisson/RedissonBaseIterator.java | 52 ++++++---------- .../org/redisson/RedissonBaseMapIterator.java | 37 +++--------- .../redisson/RedissonMultiMapIterator.java | 32 +++++----- .../redisson/client/codec/MapScanCodec.java | 15 +++-- .../org/redisson/client/codec/ScanCodec.java | 9 ++- .../protocol/decoder/ScanObjectEntry.java | 12 ++-- .../redisson/command/CommandAsyncService.java | 2 +- .../java/org/redisson/misc/HashValue.java | 59 +++++++++++++++++++ .../reactive/RedissonMapReactiveIterator.java | 21 ++----- .../reactive/SetReactiveIterator.java | 31 ++-------- .../test/java/org/redisson/RedissonTest.java | 3 +- 11 files changed, 135 insertions(+), 138 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/misc/HashValue.java diff --git a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java index 116624759..e5e1806c2 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java @@ -23,8 +23,7 @@ import java.util.NoSuchElementException; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; - -import io.netty.buffer.ByteBuf; +import org.redisson.misc.HashValue; /** * @@ -34,8 +33,8 @@ import io.netty.buffer.ByteBuf; */ abstract class RedissonBaseIterator implements Iterator { - private List firstValues; - private List lastValues; + private List firstValues; + private List lastValues; private Iterator lastIter; protected long nextIterPos; protected RedisClient client; @@ -48,9 +47,6 @@ abstract class RedissonBaseIterator implements Iterator { public boolean hasNext() { if (lastIter == null || !lastIter.hasNext()) { if (finished) { - free(firstValues); - free(lastValues); - currentElementRemoved = false; client = null; firstValues = null; @@ -64,9 +60,6 @@ abstract class RedissonBaseIterator implements Iterator { } do { ListScanResult res = iterator(client, nextIterPos); - if (lastValues != null) { - free(lastValues); - } lastValues = convert(res.getValues()); client = res.getRedisClient(); @@ -74,16 +67,16 @@ abstract class RedissonBaseIterator implements Iterator { if (nextIterPos == 0 && firstValues == null) { firstValues = lastValues; lastValues = null; - if (firstValues.isEmpty() && tryAgain()) { + if (isEmpty(firstValues) && tryAgain()) { client = null; firstValues = null; nextIterPos = 0; } } else { - if (firstValues.isEmpty()) { + if (isEmpty(firstValues)) { firstValues = lastValues; lastValues = null; - if (firstValues.isEmpty()) { + if (isEmpty(firstValues)) { if (tryAgain()) { client = null; firstValues = null; @@ -91,18 +84,12 @@ abstract class RedissonBaseIterator implements Iterator { continue; } if (res.getPos() == 0) { - free(firstValues); - free(lastValues); - finished = true; return false; } } - } else if (lastValues.removeAll(firstValues) - || (lastValues.isEmpty() && nextIterPos == 0)) { - free(firstValues); - free(lastValues); - + } else if (removeAll(lastValues, firstValues) + || (isEmpty(lastValues) && nextIterPos == 0)) { currentElementRemoved = false; client = null; @@ -124,21 +111,20 @@ abstract class RedissonBaseIterator implements Iterator { return lastIter.hasNext(); } - private List convert(List list) { - List result = new ArrayList(list.size()); - for (ScanObjectEntry entry : list) { - result.add(entry.getBuf()); - } - return result; + protected boolean isEmpty(List values) { + return values.isEmpty(); } - private void free(List list) { - if (list == null) { - return; - } - for (ByteBuf byteBuf : list) { - byteBuf.release(); + protected boolean removeAll(List lastValues, List firstValues) { + return lastValues.removeAll(firstValues); + } + + protected List convert(List list) { + List result = new ArrayList(list.size()); + for (ScanObjectEntry entry : list) { + result.add(entry.getHash()); } + return result; } protected boolean tryAgain() { diff --git a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java index 969a051a0..a15f3b0df 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -25,8 +25,7 @@ import java.util.NoSuchElementException; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; - -import io.netty.buffer.ByteBuf; +import org.redisson.misc.HashValue; /** * @@ -38,8 +37,8 @@ import io.netty.buffer.ByteBuf; */ public abstract class RedissonBaseMapIterator implements Iterator { - private Map firstValues; - private Map lastValues; + private Map firstValues; + private Map lastValues; private Iterator> lastIter; protected long nextIterPos; protected RedisClient client; @@ -52,9 +51,6 @@ public abstract class RedissonBaseMapIterator implements Iterator { public boolean hasNext() { if (lastIter == null || !lastIter.hasNext()) { if (finished) { - free(firstValues); - free(lastValues); - currentElementRemoved = false; client = null; firstValues = null; @@ -68,9 +64,6 @@ public abstract class RedissonBaseMapIterator implements Iterator { } do { MapScanResult res = iterator(); - if (lastValues != null) { - free(lastValues); - } lastValues = convert(res.getMap()); client = res.getRedisClient(); @@ -95,18 +88,12 @@ public abstract class RedissonBaseMapIterator implements Iterator { continue; } if (res.getPos() == 0) { - free(firstValues); - free(lastValues); - finished = true; return false; } } } else if (lastValues.keySet().removeAll(firstValues.keySet()) || (lastValues.isEmpty() && nextIterPos == 0)) { - free(firstValues); - free(lastValues); - currentElementRemoved = false; client = null; @@ -135,20 +122,10 @@ public abstract class RedissonBaseMapIterator implements Iterator { protected abstract MapScanResult iterator(); - private void free(Map map) { - if (map == null) { - return; - } - for (Entry entry : map.entrySet()) { - entry.getKey().release(); - entry.getValue().release(); - } - } - - private Map convert(Map map) { - Map result = new HashMap(map.size()); + private Map convert(Map map) { + Map result = new HashMap(map.size()); for (Entry entry : map.entrySet()) { - result.put(entry.getKey().getBuf(), entry.getValue().getBuf()); + result.put(entry.getKey().getHash(), entry.getValue().getHash()); } return result; } @@ -185,7 +162,7 @@ public abstract class RedissonBaseMapIterator implements Iterator { throw new IllegalStateException(); } - firstValues.remove(entry.getKey().getBuf()); + firstValues.remove(entry.getKey().getHash()); lastIter.remove(); removeKey(); currentElementRemoved = true; diff --git a/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java b/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java index 2dcbdd3bb..1722c13c4 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java @@ -15,7 +15,6 @@ */ package org.redisson; -import java.net.InetSocketAddress; import java.util.AbstractMap; import java.util.HashMap; import java.util.Iterator; @@ -28,12 +27,19 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.misc.HashValue; -import io.netty.buffer.ByteBuf; - +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + * @param map type + */ abstract class RedissonMultiMapIterator implements Iterator { - private Map firstKeys; + private Map firstKeys; private Iterator> keysIter; protected long keysIterPos = 0; @@ -76,15 +82,12 @@ abstract class RedissonMultiMapIterator implements Iterator { if (keysIterPos == 0 && firstKeys == null) { firstKeys = convert(res.getMap()); } else { - Map newValues = convert(res.getMap()); + Map newValues = convert(res.getMap()); if (newValues.equals(firstKeys)) { finished = true; - free(firstKeys); - free(newValues); firstKeys = null; return false; } - free(newValues); } keysIter = res.getMap().entrySet().iterator(); keysIterPos = res.getPos(); @@ -107,17 +110,10 @@ abstract class RedissonMultiMapIterator implements Iterator { protected abstract Iterator getIterator(String name); - private void free(Map map) { - for (Entry entry : map.entrySet()) { - entry.getKey().release(); - entry.getValue().release(); - } - } - - private Map convert(Map map) { - Map result = new HashMap(map.size()); + private Map convert(Map map) { + Map result = new HashMap(map.size()); for (Entry entry : map.entrySet()) { - result.put(entry.getKey().getBuf(), entry.getValue().getBuf()); + result.put(entry.getKey().getHash(), entry.getValue().getHash()); } return result; } diff --git a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java index e513aa0c2..25b1bbbe9 100644 --- a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java @@ -21,9 +21,10 @@ import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.misc.Hash; +import org.redisson.misc.HashValue; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; /** * @@ -59,13 +60,15 @@ public class MapScanCodec implements Codec { return new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); + buf.markReaderIndex(); + long[] hash = Hash.hash128(buf); + buf.resetReaderIndex(); Codec c = delegate; if (mapValueCodec != null) { c = mapValueCodec; } Object val = c.getMapValueDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); + return new ScanObjectEntry(new HashValue(hash), val); } }; } @@ -85,9 +88,11 @@ public class MapScanCodec implements Codec { return new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); + buf.markReaderIndex(); + long[] hash = Hash.hash128(buf); + buf.resetReaderIndex(); Object val = delegate.getMapKeyDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); + return new ScanObjectEntry(new HashValue(hash), val); } }; } diff --git a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java index 40c50cddf..a50855ca2 100644 --- a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java @@ -21,9 +21,10 @@ import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.misc.Hash; +import org.redisson.misc.HashValue; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; /** * @@ -43,9 +44,11 @@ public class ScanCodec implements Codec { return new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); + buf.markReaderIndex(); + long[] hash = Hash.hash128(buf); + buf.resetReaderIndex(); Object val = delegate.getValueDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); + return new ScanObjectEntry(new HashValue(hash), val); } }; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java index b0dfb408a..687b77c9e 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScanObjectEntry.java @@ -15,7 +15,7 @@ */ package org.redisson.client.protocol.decoder; -import io.netty.buffer.ByteBuf; +import org.redisson.misc.HashValue; /** * @@ -24,16 +24,16 @@ import io.netty.buffer.ByteBuf; */ public class ScanObjectEntry { - private final ByteBuf buf; + private final HashValue hash; private final Object obj; - public ScanObjectEntry(ByteBuf buf, Object obj) { - this.buf = buf; + public ScanObjectEntry(HashValue hash, Object obj) { + this.hash = hash; this.obj = obj; } - public ByteBuf getBuf() { - return buf; + public HashValue getHash() { + return hash; } public Object getObj() { diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 706edbbba..eab855f5c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -986,7 +986,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } else if (o instanceof ScanObjectEntry) { ScanObjectEntry keyScan = (ScanObjectEntry) o; Object obj = tryHandleReference0(keyScan.getObj()); - return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getBuf(), obj) : o; + return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getHash(), obj) : o; } else if (o instanceof Map.Entry) { Map.Entry old = (Map.Entry) o; Object key = tryHandleReference0(old.getKey()); diff --git a/redisson/src/main/java/org/redisson/misc/HashValue.java b/redisson/src/main/java/org/redisson/misc/HashValue.java new file mode 100644 index 000000000..a59a78a79 --- /dev/null +++ b/redisson/src/main/java/org/redisson/misc/HashValue.java @@ -0,0 +1,59 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import java.util.Arrays; + +/** + * + * @author Nikita Koksharov + * + */ +public class HashValue { + + private final long[] value; + + public HashValue(long[] hash) { + this.value = hash; + } + + public long[] getValue() { + return value; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(value); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HashValue other = (HashValue) obj; + if (!Arrays.equals(value, other.value)) + return false; + return true; + } + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java index 98d7b06a8..213c10aa8 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java @@ -26,8 +26,8 @@ import org.reactivestreams.Subscription; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.misc.HashValue; -import io.netty.buffer.ByteBuf; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -54,7 +54,7 @@ public class RedissonMapReactiveIterator { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { - private Map firstValues; + private Map firstValues; private long iterPos = 0; private RedisClient client; @@ -66,10 +66,10 @@ public class RedissonMapReactiveIterator { nextValues(); } - private Map convert(Map map) { - Map result = new HashMap(map.size()); + private Map convert(Map map) { + Map result = new HashMap(map.size()); for (Entry entry : map.entrySet()) { - result.put(entry.getKey().getBuf(), entry.getValue().getBuf()); + result.put(entry.getKey().getHash(), entry.getValue().getHash()); } return result; } @@ -83,23 +83,12 @@ public class RedissonMapReactiveIterator { s.request(Long.MAX_VALUE); } - private void free(Map map) { - if (map == null) { - return; - } - for (Entry entry : map.entrySet()) { - entry.getKey().release(); - entry.getValue().release(); - } - } - @Override public void onNext(MapScanResult res) { client = res.getRedisClient(); if (iterPos == 0 && firstValues == null) { firstValues = convert(res.getMap()); } else if (convert(res.getMap()).equals(firstValues)) { - free(firstValues); m.onComplete(); currentIndex = 0; return; diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index 3d42febd4..c32406f60 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -25,8 +24,8 @@ import org.reactivestreams.Subscription; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.misc.HashValue; -import io.netty.buffer.ByteBuf; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -42,8 +41,8 @@ public abstract class SetReactiveIterator extends Stream { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { - private List firstValues; - private List lastValues; + private List firstValues; + private List lastValues; private long nextIterPos; private RedisClient client; @@ -72,9 +71,6 @@ public abstract class SetReactiveIterator extends Stream { @Override public void onNext(ListScanResult res) { if (finished) { - free(firstValues); - free(lastValues); - client = null; firstValues = null; lastValues = null; @@ -83,9 +79,6 @@ public abstract class SetReactiveIterator extends Stream { } long prevIterPos = nextIterPos; - if (lastValues != null) { - free(lastValues); - } lastValues = convert(res.getValues()); client = res.getRedisClient(); @@ -111,9 +104,6 @@ public abstract class SetReactiveIterator extends Stream { } } } else if (lastValues.removeAll(firstValues)) { - free(firstValues); - free(lastValues); - client = null; firstValues = null; lastValues = null; @@ -152,19 +142,10 @@ public abstract class SetReactiveIterator extends Stream { }); } - private void free(List list) { - if (list == null) { - return; - } - for (ByteBuf byteBuf : list) { - byteBuf.release(); - } - } - - private List convert(List list) { - List result = new ArrayList(list.size()); + private List convert(List list) { + List result = new ArrayList(list.size()); for (ScanObjectEntry entry : list) { - result.add(entry.getBuf()); + result.add(entry.getHash()); } return result; } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 8400d4770..0f6507289 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -39,6 +39,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.codec.SerializationCodec; import org.redisson.config.Config; import org.redisson.connection.ConnectionListener; +import org.redisson.misc.HashValue; import io.netty.buffer.Unpooled; @@ -114,7 +115,7 @@ public class RedissonTest { ListScanResult iterator(RedisClient client, long nextIterPos) { i++; if (i == 1) { - return new ListScanResult(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1))); + return new ListScanResult(14L, Arrays.asList(new ScanObjectEntry(new HashValue(new long[]{1L}) , 1))); } if (i == 2) { return new ListScanResult(7L, Collections.emptyList());