Split binary stream of RBinaryStream object to multiple objects if it exceeds 512MB. #669

pull/697/head
Nikita 8 years ago
parent fe8739194f
commit ca1a320dca

@ -18,16 +18,21 @@ package org.redisson;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import org.redisson.api.RBinaryStream;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
@ -40,14 +45,23 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
@Override
public void write(int b) throws IOException {
get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), new byte[] {(byte)b}));
writeBytes(new byte[] {(byte)b});
}
private void writeBytes(byte[] bytes) {
get(writeAsync(bytes));
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] dest = new byte[len];
System.arraycopy(b, off, dest, 0, len);
get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), dest));
byte[] dest;
if (b.length == len && off == 0) {
dest = b;
} else {
dest = new byte[len];
System.arraycopy(b, off, dest, 0, len);
}
writeBytes(dest);
}
}
@ -55,43 +69,172 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
class RedissonInputStream extends InputStream {
private int index;
private int mark;
@Override
public long skip(long n) throws IOException {
long k = size() - index;
if (n < k) {
k = n < 0 ? 0 : n;
}
index += k;
return k;
}
@Override
public void mark(int readlimit) {
mark = index;
}
@Override
public void reset() throws IOException {
index = mark;
}
@Override
public int available() throws IOException {
return (int)(size() - index);
}
@Override
public boolean markSupported() {
return true;
}
@Override
public int read() throws IOException {
byte[] result = (byte[])get(commandExecutor.readAsync(getName(), codec, RedisCommands.GETRANGE, getName(), index, index));
if (result.length == 0) {
byte[] b = new byte[1];
int len = read(b);
if (len == -1) {
return -1;
}
index++;
return result[0];
return b[0] & 0xff;
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (len == 0) {
return 0;
}
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
return (Integer)get(commandExecutor.readAsync(getName(), codec, new RedisCommand<Integer>("GETRANGE", new Decoder<Integer>() {
@Override
public Integer decode(ByteBuf buf, State state) {
if (buf.readableBytes() == 0) {
return -1;
}
int readBytes = Math.min(buf.readableBytes(), len);
buf.readBytes(b, off, readBytes);
index += readBytes;
return readBytes;
}
}), getName(), index, index + b.length - 1));
return get(commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Integer>("EVAL", new Decoder<Integer>() {
@Override
public Integer decode(ByteBuf buf, State state) {
if (buf.readableBytes() == 0) {
return -1;
}
int readBytes = Math.min(buf.readableBytes(), len);
buf.readBytes(b, off, readBytes);
index += readBytes;
return readBytes;
}
}),
"local parts = redis.call('get', KEYS[2]); "
+ "if parts ~= false then "
+ "local startPart = math.floor(tonumber(ARGV[1])/536870912); "
+ "local endPart = math.floor(tonumber(ARGV[2])/536870912); "
+ "local startPartName = KEYS[1]; "
+ "local endPartName = KEYS[1]; "
+ "if startPart > 0 then "
+ "startPartName = KEYS[1] .. ':' .. startPart; "
+ "end; "
+ "if endPart > 0 then "
+ "endPartName = KEYS[1] .. ':' .. endPart; "
+ "end; "
+ "if startPartName ~= endPartName then "
+ "local startIndex = tonumber(ARGV[1]) - startPart*536870912; "
+ "local endIndex = tonumber(ARGV[2]) - endPart*536870912; "
+ "local result = redis.call('getrange', startPartName, startIndex, 536870911); "
+ "result = result .. redis.call('getrange', endPartName, 0, endIndex-1); "
+ "return result; "
+ "end; "
+ "local startIndex = tonumber(ARGV[1]) - startPart*536870912; "
+ "local endIndex = tonumber(ARGV[2]) - endPart*536870912; "
+ "return redis.call('getrange', startPartName, startIndex, endIndex);"
+ "end;"
+ "return redis.call('getrange', KEYS[1], ARGV[1], ARGV[2]);",
Arrays.<Object>asList(getName(), getPartsName()), index, index + len - 1));
}
}
protected RedissonBinaryStream(CommandAsyncExecutor connectionManager, String name) {
super(ByteArrayCodec.INSTANCE, connectionManager, name);
}
@Override
public RFuture<Long> sizeAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local parts = redis.call('get', KEYS[2]); "
+ "local lastPartName = KEYS[1];"
+ "if parts ~= false then "
+ "lastPartName = KEYS[1] .. ':' .. (tonumber(parts)-1);"
+ "local lastPartSize = redis.call('strlen', lastPartName);"
+ "return ((tonumber(parts)-1) * 536870912) + lastPartSize;"
+ "end;"
+ "return redis.call('strlen', lastPartName);",
Arrays.<Object>asList(getName(), getPartsName()));
}
private RFuture<Void> writeAsync(byte[] bytes) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local parts = redis.call('get', KEYS[2]); "
+ "local lastPartName = KEYS[1];"
+ "if parts ~= false then "
+ "lastPartName = KEYS[1] .. ':' .. (tonumber(parts)-1);"
+ "end;"
+ "local lastPartSize = redis.call('strlen', lastPartName);"
+ "if lastPartSize == 0 then "
+ "redis.call('append', lastPartName, ARGV[1]); "
+ "return; "
+ "end;"
+ "local chunkSize = 536870912 - lastPartSize; "
+ "local arraySize = string.len(ARGV[1]); "
+ "if chunkSize > 0 then "
+ "if chunkSize >= arraySize then "
+ "redis.call('append', lastPartName, ARGV[1]); "
+ "return; "
+ "else "
+ "local chunk = string.sub(ARGV[1], 1, chunkSize);"
+ "redis.call('append', lastPartName, chunk); "
+ "if parts == false then "
+ "parts = 1;"
+ "redis.call('incrby', KEYS[2], 2); "
+ "else "
+ "redis.call('incrby', KEYS[2], 1); "
+ "end; "
+ "local newPartName = KEYS[1] .. ':' .. parts; "
+ "chunk = string.sub(ARGV[1], -(arraySize - chunkSize));"
+ "redis.call('append', newPartName, chunk); "
+ "end; "
+ "else "
+ "if parts == false then "
+ "parts = 1;"
+ "redis.call('incrby', KEYS[2], 2); "
+ "else "
+ "redis.call('incrby', KEYS[2], 1); "
+ "end; "
+ "local newPartName = KEYS[1] .. ':' .. parts; "
+ "local chunk = string.sub(ARGV[1], -(arraySize - chunkSize));"
+ "redis.call('append', newPartName, ARGV[1]); "
+ "end; ",
Arrays.<Object>asList(getName(), getPartsName()), bytes);
}
@Override
public InputStream getInputStream() {
return new RedissonInputStream();
@ -102,4 +245,56 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
return new RedissonOutputStream();
}
@Override
public RFuture<Void> setAsync(byte[] value) {
if (value.length > 512*1024*1024) {
RPromise<Void> result = newPromise();
int chunkSize = 10*1024*1024;
write(value, result, chunkSize, 0);
return result;
}
return super.setAsync(value);
}
private void write(final byte[] value, final RPromise<Void> result, final int chunkSize, final int i) {
final int len = Math.min(value.length - i*chunkSize, chunkSize);
byte[] bytes = Arrays.copyOfRange(value, i*chunkSize, i*chunkSize + len);
writeAsync(bytes).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
int j = i + 1;
if (j*chunkSize > value.length) {
result.trySuccess(null);
} else {
write(value, result, chunkSize, j);
}
}
});
}
private String getPartsName() {
return getName() + ":parts";
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_AMOUNT,
"local parts = redis.call('get', KEYS[2]); "
+ "local names = {KEYS[1], KEYS[2]};"
+ "if parts ~= false then "
+ "for i = 1, tonumber(parts)-1, 1 do "
+ "table.insert(names, KEYS[1] .. ':' .. i); "
+ "end; "
+ "end;"
+ "return redis.call('del', unpack(names));",
Arrays.<Object>asList(getName(), getPartsName()));
}
}

@ -24,6 +24,12 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
protected RedissonBucket(CommandAsyncExecutor connectionManager, String name) {
@ -97,12 +103,12 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
}
@Override
public int size() {
public long size() {
return get(sizeAsync());
}
@Override
public RFuture<Integer> sizeAsync() {
public RFuture<Long> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.STRLEN, getName());
}

@ -19,15 +19,27 @@ import java.io.InputStream;
import java.io.OutputStream;
/**
* Binary stream holder
* Binary stream holder. Maximum size of stream is limited by available memory of Redis master node.
*
* @author Nikita Koksharov
*
*/
public interface RBinaryStream extends RBucket<byte[]> {
/**
* Returns inputStream which reads binary stream.
* This stream isn't thread-safe.
*
* @return stream
*/
InputStream getInputStream();
/**
* Returns outputStream which writes binary stream.
* This stream isn't thread-safe.
*
* @return stream
*/
OutputStream getOutputStream();
}

@ -18,7 +18,7 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Any object holder
* Any object holder. Max size of object is 512MB
*
* @author Nikita Koksharov
*
@ -31,7 +31,7 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
*
* @return object size
*/
int size();
long size();
V get();

@ -31,7 +31,7 @@ public interface RBucketAsync<V> extends RExpirableAsync {
*
* @return object size
*/
RFuture<Integer> sizeAsync();
RFuture<Long> sizeAsync();
RFuture<V> getAsync();

@ -62,6 +62,11 @@ import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.cluster.ClusterNodeInfo;
/**
*
* @author Nikita Koksharov
*
*/
public interface RedisCommands {
RedisStrictCommand<Long> GEOADD = new RedisStrictCommand<Long>("GEOADD", 4);
@ -75,7 +80,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> GETBIT = new RedisStrictCommand<Boolean>("GETBIT", new BooleanReplayConvertor());
RedisStrictCommand<Integer> BITS_SIZE = new RedisStrictCommand<Integer>("STRLEN", new BitsSizeReplayConvertor());
RedisStrictCommand<Integer> STRLEN = new RedisStrictCommand<Integer>("STRLEN", new IntegerReplayConvertor());
RedisStrictCommand<Long> STRLEN = new RedisStrictCommand<Long>("STRLEN");
RedisStrictCommand<Long> BITCOUNT = new RedisStrictCommand<Long>("BITCOUNT");
RedisStrictCommand<Integer> BITPOS = new RedisStrictCommand<Integer>("BITPOS", new IntegerReplayConvertor());
RedisStrictCommand<Void> SETBIT_VOID = new RedisStrictCommand<Void>("SETBIT", new VoidReplayConvertor());

@ -1,19 +1,122 @@
package org.redisson;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.Test;
import org.redisson.api.RBinaryStream;
public class RedissonBinaryStreamTest extends BaseTest {
@Test
public void testEmptyRead() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
assertThat(stream.getInputStream().read()).isEqualTo(-1);
}
private void testLimit(int sizeInMBs, int chunkSize) throws IOException, NoSuchAlgorithmException {
RBinaryStream stream = redisson.getBinaryStream("test");
MessageDigest hash = MessageDigest.getInstance("SHA-1");
hash.reset();
for (int i = 0; i < sizeInMBs; i++) {
byte[] bytes = new byte[chunkSize];
ThreadLocalRandom.current().nextBytes(bytes);
hash.update(bytes);
stream.getOutputStream().write(bytes);
}
String writtenDataHash = new BigInteger(1, hash.digest()).toString(16);
hash.reset();
InputStream s = stream.getInputStream();
long readBytesTotal = 0;
while (true) {
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(0, chunkSize)];
int readBytes = s.read(bytes);
if (readBytes == -1) {
break;
}
if (readBytes < bytes.length) {
bytes = Arrays.copyOf(bytes, readBytes);
}
hash.update(bytes);
readBytesTotal += readBytes;
}
String readDataHash = new BigInteger(1, hash.digest()).toString(16);
assertThat(writtenDataHash).isEqualTo(readDataHash);
assertThat(readBytesTotal).isEqualTo(sizeInMBs*chunkSize);
assertThat(stream.size()).isEqualTo(sizeInMBs*chunkSize);
assertThat(stream.size()).isEqualTo(sizeInMBs*chunkSize);
assertThat(redisson.getBucket("test").isExists()).isTrue();
if (sizeInMBs*chunkSize <= 512*1024*1024) {
assertThat(redisson.getBucket("test:parts").isExists()).isFalse();
assertThat(redisson.getBucket("test:1").isExists()).isFalse();
} else {
int parts = (sizeInMBs*chunkSize)/(512*1024*1024);
for (int i = 1; i < parts-1; i++) {
assertThat(redisson.getBucket("test:" + i).isExists()).isTrue();
}
}
}
@Test
public void testLimit512by1024() throws IOException, NoSuchAlgorithmException {
testLimit(512, 1024*1024);
}
@Test
public void testLimit1024By1000() throws IOException, NoSuchAlgorithmException {
testLimit(1024, 1000*1000);
}
@Test
public void testSet100() {
RBinaryStream stream = redisson.getBinaryStream("test");
byte[] bytes = new byte[100*1024*1024];
ThreadLocalRandom.current().nextBytes(bytes);
stream.set(bytes);
assertThat(stream.size()).isEqualTo(bytes.length);
assertThat(stream.get()).isEqualTo(bytes);
}
@Test
public void testSet1024() {
RBinaryStream stream = redisson.getBinaryStream("test");
byte[] bytes = new byte[1024*1024*1024];
ThreadLocalRandom.current().nextBytes(bytes);
stream.set(bytes);
assertThat(stream.size()).isEqualTo(bytes.length);
assertThat(redisson.getBucket("test:parts").isExists()).isTrue();
assertThat(redisson.getBucket("test").size()).isEqualTo(512*1024*1024);
assertThat(redisson.getBucket("test:1").size()).isEqualTo(bytes.length - 512*1024*1024);
}
@Test
public void testLimit1024By1024() throws IOException, NoSuchAlgorithmException {
testLimit(1024, 1024*1024);
}
@Test
public void testRead() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
byte[] value = {1, 2, 3, 4, 5, 6};
byte[] value = {1, 2, 3, 4, 5, (byte)0xFF};
stream.set(value);
InputStream s = stream.getInputStream();
@ -29,7 +132,7 @@ public class RedissonBinaryStreamTest extends BaseTest {
i++;
}
assertThat(value).isEqualTo(readValue);
assertThat(readValue).isEqualTo(value);
}
@Test

Loading…
Cancel
Save