diff --git a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java index 000c42208..262d63a6b 100644 --- a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java +++ b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java @@ -18,16 +18,21 @@ package org.redisson; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import org.redisson.api.RBinaryStream; +import org.redisson.api.RFuture; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.misc.RPromise; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; /** * @@ -40,14 +45,23 @@ public class RedissonBinaryStream extends RedissonBucket implements RBin @Override public void write(int b) throws IOException { - get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), new byte[] {(byte)b})); + writeBytes(new byte[] {(byte)b}); + } + + private void writeBytes(byte[] bytes) { + get(writeAsync(bytes)); } @Override public void write(byte[] b, int off, int len) throws IOException { - byte[] dest = new byte[len]; - System.arraycopy(b, off, dest, 0, len); - get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), dest)); + byte[] dest; + if (b.length == len && off == 0) { + dest = b; + } else { + dest = new byte[len]; + System.arraycopy(b, off, dest, 0, len); + } + writeBytes(dest); } } @@ -55,43 +69,172 @@ public class RedissonBinaryStream extends RedissonBucket implements RBin class RedissonInputStream extends InputStream { private int index; + private int mark; + + @Override + public long skip(long n) throws IOException { + long k = size() - index; + if (n < k) { + k = n < 0 ? 0 : n; + } + + index += k; + return k; + } + + @Override + public void mark(int readlimit) { + mark = index; + } + + @Override + public void reset() throws IOException { + index = mark; + } + + @Override + public int available() throws IOException { + return (int)(size() - index); + } + + @Override + public boolean markSupported() { + return true; + } @Override public int read() throws IOException { - byte[] result = (byte[])get(commandExecutor.readAsync(getName(), codec, RedisCommands.GETRANGE, getName(), index, index)); - if (result.length == 0) { + byte[] b = new byte[1]; + int len = read(b); + if (len == -1) { return -1; } - index++; - return result[0]; + return b[0] & 0xff; } @Override public int read(final byte[] b, final int off, final int len) throws IOException { + if (len == 0) { + return 0; + } if (b == null) { throw new NullPointerException(); } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } - return (Integer)get(commandExecutor.readAsync(getName(), codec, new RedisCommand("GETRANGE", new Decoder() { - @Override - public Integer decode(ByteBuf buf, State state) { - if (buf.readableBytes() == 0) { - return -1; - } - int readBytes = Math.min(buf.readableBytes(), len); - buf.readBytes(b, off, readBytes); - index += readBytes; - return readBytes; - } - }), getName(), index, index + b.length - 1)); + return get(commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", new Decoder() { + @Override + public Integer decode(ByteBuf buf, State state) { + if (buf.readableBytes() == 0) { + return -1; + } + int readBytes = Math.min(buf.readableBytes(), len); + buf.readBytes(b, off, readBytes); + index += readBytes; + return readBytes; + } + }), + "local parts = redis.call('get', KEYS[2]); " + + "if parts ~= false then " + + "local startPart = math.floor(tonumber(ARGV[1])/536870912); " + + "local endPart = math.floor(tonumber(ARGV[2])/536870912); " + + "local startPartName = KEYS[1]; " + + "local endPartName = KEYS[1]; " + + + "if startPart > 0 then " + + "startPartName = KEYS[1] .. ':' .. startPart; " + + "end; " + + "if endPart > 0 then " + + "endPartName = KEYS[1] .. ':' .. endPart; " + + "end; " + + + "if startPartName ~= endPartName then " + + "local startIndex = tonumber(ARGV[1]) - startPart*536870912; " + + "local endIndex = tonumber(ARGV[2]) - endPart*536870912; " + + "local result = redis.call('getrange', startPartName, startIndex, 536870911); " + + "result = result .. redis.call('getrange', endPartName, 0, endIndex-1); " + + "return result; " + + "end; " + + + "local startIndex = tonumber(ARGV[1]) - startPart*536870912; " + + "local endIndex = tonumber(ARGV[2]) - endPart*536870912; " + + "return redis.call('getrange', startPartName, startIndex, endIndex);" + + "end;" + + "return redis.call('getrange', KEYS[1], ARGV[1], ARGV[2]);", + Arrays.asList(getName(), getPartsName()), index, index + len - 1)); } - + } protected RedissonBinaryStream(CommandAsyncExecutor connectionManager, String name) { super(ByteArrayCodec.INSTANCE, connectionManager, name); } + + @Override + public RFuture sizeAsync() { + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LONG, + "local parts = redis.call('get', KEYS[2]); " + + "local lastPartName = KEYS[1];" + + "if parts ~= false then " + + "lastPartName = KEYS[1] .. ':' .. (tonumber(parts)-1);" + + "local lastPartSize = redis.call('strlen', lastPartName);" + + "return ((tonumber(parts)-1) * 536870912) + lastPartSize;" + + "end;" + + "return redis.call('strlen', lastPartName);", + Arrays.asList(getName(), getPartsName())); + } + private RFuture writeAsync(byte[] bytes) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "local parts = redis.call('get', KEYS[2]); " + + "local lastPartName = KEYS[1];" + + "if parts ~= false then " + + "lastPartName = KEYS[1] .. ':' .. (tonumber(parts)-1);" + + "end;" + + "local lastPartSize = redis.call('strlen', lastPartName);" + + "if lastPartSize == 0 then " + + "redis.call('append', lastPartName, ARGV[1]); " + + "return; " + + "end;" + + + "local chunkSize = 536870912 - lastPartSize; " + + "local arraySize = string.len(ARGV[1]); " + + "if chunkSize > 0 then " + + "if chunkSize >= arraySize then " + + "redis.call('append', lastPartName, ARGV[1]); " + + "return; " + + "else " + + "local chunk = string.sub(ARGV[1], 1, chunkSize);" + + "redis.call('append', lastPartName, chunk); " + + + "if parts == false then " + + "parts = 1;" + + "redis.call('incrby', KEYS[2], 2); " + + "else " + + "redis.call('incrby', KEYS[2], 1); " + + "end; " + + + "local newPartName = KEYS[1] .. ':' .. parts; " + + "chunk = string.sub(ARGV[1], -(arraySize - chunkSize));" + + "redis.call('append', newPartName, chunk); " + + "end; " + + "else " + + "if parts == false then " + + "parts = 1;" + + "redis.call('incrby', KEYS[2], 2); " + + "else " + + "redis.call('incrby', KEYS[2], 1); " + + "end; " + + + "local newPartName = KEYS[1] .. ':' .. parts; " + + "local chunk = string.sub(ARGV[1], -(arraySize - chunkSize));" + + "redis.call('append', newPartName, ARGV[1]); " + + "end; ", + Arrays.asList(getName(), getPartsName()), bytes); + } + @Override public InputStream getInputStream() { return new RedissonInputStream(); @@ -102,4 +245,56 @@ public class RedissonBinaryStream extends RedissonBucket implements RBin return new RedissonOutputStream(); } + @Override + public RFuture setAsync(byte[] value) { + if (value.length > 512*1024*1024) { + RPromise result = newPromise(); + int chunkSize = 10*1024*1024; + write(value, result, chunkSize, 0); + return result; + } + + return super.setAsync(value); + } + + private void write(final byte[] value, final RPromise result, final int chunkSize, final int i) { + final int len = Math.min(value.length - i*chunkSize, chunkSize); + byte[] bytes = Arrays.copyOfRange(value, i*chunkSize, i*chunkSize + len); + writeAsync(bytes).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + int j = i + 1; + if (j*chunkSize > value.length) { + result.trySuccess(null); + } else { + write(value, result, chunkSize, j); + } + } + }); + } + + private String getPartsName() { + return getName() + ":parts"; + } + + @Override + public RFuture deleteAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_AMOUNT, + "local parts = redis.call('get', KEYS[2]); " + + "local names = {KEYS[1], KEYS[2]};" + + "if parts ~= false then " + + "for i = 1, tonumber(parts)-1, 1 do " + + "table.insert(names, KEYS[1] .. ':' .. i); " + + "end; " + + "end;" + + "return redis.call('del', unpack(names));", + Arrays.asList(getName(), getPartsName())); + + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonBucket.java b/redisson/src/main/java/org/redisson/RedissonBucket.java index 62279ffd5..66c852174 100644 --- a/redisson/src/main/java/org/redisson/RedissonBucket.java +++ b/redisson/src/main/java/org/redisson/RedissonBucket.java @@ -24,6 +24,12 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +/** + * + * @author Nikita Koksharov + * + * @param value type + */ public class RedissonBucket extends RedissonExpirable implements RBucket { protected RedissonBucket(CommandAsyncExecutor connectionManager, String name) { @@ -97,12 +103,12 @@ public class RedissonBucket extends RedissonExpirable implements RBucket { } @Override - public int size() { + public long size() { return get(sizeAsync()); } @Override - public RFuture sizeAsync() { + public RFuture sizeAsync() { return commandExecutor.readAsync(getName(), codec, RedisCommands.STRLEN, getName()); } diff --git a/redisson/src/main/java/org/redisson/api/RBinaryStream.java b/redisson/src/main/java/org/redisson/api/RBinaryStream.java index 2bf12a488..f9ff67e69 100644 --- a/redisson/src/main/java/org/redisson/api/RBinaryStream.java +++ b/redisson/src/main/java/org/redisson/api/RBinaryStream.java @@ -19,15 +19,27 @@ import java.io.InputStream; import java.io.OutputStream; /** - * Binary stream holder + * Binary stream holder. Maximum size of stream is limited by available memory of Redis master node. * * @author Nikita Koksharov * */ public interface RBinaryStream extends RBucket { + /** + * Returns inputStream which reads binary stream. + * This stream isn't thread-safe. + * + * @return stream + */ InputStream getInputStream(); - + + /** + * Returns outputStream which writes binary stream. + * This stream isn't thread-safe. + * + * @return stream + */ OutputStream getOutputStream(); } diff --git a/redisson/src/main/java/org/redisson/api/RBucket.java b/redisson/src/main/java/org/redisson/api/RBucket.java index 8384082ee..f05f42f12 100644 --- a/redisson/src/main/java/org/redisson/api/RBucket.java +++ b/redisson/src/main/java/org/redisson/api/RBucket.java @@ -18,7 +18,7 @@ package org.redisson.api; import java.util.concurrent.TimeUnit; /** - * Any object holder + * Any object holder. Max size of object is 512MB * * @author Nikita Koksharov * @@ -31,7 +31,7 @@ public interface RBucket extends RExpirable, RBucketAsync { * * @return object size */ - int size(); + long size(); V get(); diff --git a/redisson/src/main/java/org/redisson/api/RBucketAsync.java b/redisson/src/main/java/org/redisson/api/RBucketAsync.java index 23508a2d7..d3658fa2c 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBucketAsync.java @@ -31,7 +31,7 @@ public interface RBucketAsync extends RExpirableAsync { * * @return object size */ - RFuture sizeAsync(); + RFuture sizeAsync(); RFuture getAsync(); diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 80992c94e..6449e3457 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -62,6 +62,11 @@ import org.redisson.client.protocol.decoder.StringReplayDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.cluster.ClusterNodeInfo; +/** + * + * @author Nikita Koksharov + * + */ public interface RedisCommands { RedisStrictCommand GEOADD = new RedisStrictCommand("GEOADD", 4); @@ -75,7 +80,7 @@ public interface RedisCommands { RedisStrictCommand GETBIT = new RedisStrictCommand("GETBIT", new BooleanReplayConvertor()); RedisStrictCommand BITS_SIZE = new RedisStrictCommand("STRLEN", new BitsSizeReplayConvertor()); - RedisStrictCommand STRLEN = new RedisStrictCommand("STRLEN", new IntegerReplayConvertor()); + RedisStrictCommand STRLEN = new RedisStrictCommand("STRLEN"); RedisStrictCommand BITCOUNT = new RedisStrictCommand("BITCOUNT"); RedisStrictCommand BITPOS = new RedisStrictCommand("BITPOS", new IntegerReplayConvertor()); RedisStrictCommand SETBIT_VOID = new RedisStrictCommand("SETBIT", new VoidReplayConvertor()); diff --git a/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java b/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java index 6e3141655..4214843f7 100644 --- a/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java @@ -1,19 +1,122 @@ package org.redisson; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; import org.redisson.api.RBinaryStream; public class RedissonBinaryStreamTest extends BaseTest { + @Test + public void testEmptyRead() throws IOException { + RBinaryStream stream = redisson.getBinaryStream("test"); + assertThat(stream.getInputStream().read()).isEqualTo(-1); + } + + private void testLimit(int sizeInMBs, int chunkSize) throws IOException, NoSuchAlgorithmException { + RBinaryStream stream = redisson.getBinaryStream("test"); + + MessageDigest hash = MessageDigest.getInstance("SHA-1"); + hash.reset(); + + for (int i = 0; i < sizeInMBs; i++) { + byte[] bytes = new byte[chunkSize]; + ThreadLocalRandom.current().nextBytes(bytes); + hash.update(bytes); + stream.getOutputStream().write(bytes); + } + + String writtenDataHash = new BigInteger(1, hash.digest()).toString(16); + + hash.reset(); + InputStream s = stream.getInputStream(); + long readBytesTotal = 0; + while (true) { + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(0, chunkSize)]; + int readBytes = s.read(bytes); + if (readBytes == -1) { + break; + } + if (readBytes < bytes.length) { + bytes = Arrays.copyOf(bytes, readBytes); + } + hash.update(bytes); + readBytesTotal += readBytes; + } + String readDataHash = new BigInteger(1, hash.digest()).toString(16); + + assertThat(writtenDataHash).isEqualTo(readDataHash); + assertThat(readBytesTotal).isEqualTo(sizeInMBs*chunkSize); + assertThat(stream.size()).isEqualTo(sizeInMBs*chunkSize); + + assertThat(stream.size()).isEqualTo(sizeInMBs*chunkSize); + assertThat(redisson.getBucket("test").isExists()).isTrue(); + if (sizeInMBs*chunkSize <= 512*1024*1024) { + assertThat(redisson.getBucket("test:parts").isExists()).isFalse(); + assertThat(redisson.getBucket("test:1").isExists()).isFalse(); + } else { + int parts = (sizeInMBs*chunkSize)/(512*1024*1024); + for (int i = 1; i < parts-1; i++) { + assertThat(redisson.getBucket("test:" + i).isExists()).isTrue(); + } + } + } + + + @Test + public void testLimit512by1024() throws IOException, NoSuchAlgorithmException { + testLimit(512, 1024*1024); + } + + @Test + public void testLimit1024By1000() throws IOException, NoSuchAlgorithmException { + testLimit(1024, 1000*1000); + } + + @Test + public void testSet100() { + RBinaryStream stream = redisson.getBinaryStream("test"); + + byte[] bytes = new byte[100*1024*1024]; + ThreadLocalRandom.current().nextBytes(bytes); + stream.set(bytes); + + assertThat(stream.size()).isEqualTo(bytes.length); + assertThat(stream.get()).isEqualTo(bytes); + } + + @Test + public void testSet1024() { + RBinaryStream stream = redisson.getBinaryStream("test"); + + byte[] bytes = new byte[1024*1024*1024]; + ThreadLocalRandom.current().nextBytes(bytes); + stream.set(bytes); + + assertThat(stream.size()).isEqualTo(bytes.length); + assertThat(redisson.getBucket("test:parts").isExists()).isTrue(); + assertThat(redisson.getBucket("test").size()).isEqualTo(512*1024*1024); + assertThat(redisson.getBucket("test:1").size()).isEqualTo(bytes.length - 512*1024*1024); + } + + @Test + public void testLimit1024By1024() throws IOException, NoSuchAlgorithmException { + testLimit(1024, 1024*1024); + } + @Test public void testRead() throws IOException { RBinaryStream stream = redisson.getBinaryStream("test"); - byte[] value = {1, 2, 3, 4, 5, 6}; + byte[] value = {1, 2, 3, 4, 5, (byte)0xFF}; stream.set(value); InputStream s = stream.getInputStream(); @@ -29,7 +132,7 @@ public class RedissonBinaryStreamTest extends BaseTest { i++; } - assertThat(value).isEqualTo(readValue); + assertThat(readValue).isEqualTo(value); } @Test