diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 1b0a5ed79..e40f4c800 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -26,6 +26,7 @@ import org.redisson.api.NodesGroup; import org.redisson.api.RAtomicDouble; import org.redisson.api.RAtomicLong; import org.redisson.api.RBatch; +import org.redisson.api.RBinaryStream; import org.redisson.api.RBitSet; import org.redisson.api.RBlockingDeque; import org.redisson.api.RBlockingQueue; @@ -182,6 +183,11 @@ public class Redisson implements RedissonClient { return react; } + @Override + public RBinaryStream getBinaryStream(String name) { + return new RedissonBinaryStream(commandExecutor, name); + } + @Override public RGeo getGeo(String name) { return new RedissonGeo(commandExecutor, name); diff --git a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java new file mode 100644 index 000000000..000c42208 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java @@ -0,0 +1,105 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.redisson.api.RBinaryStream; +import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandAsyncExecutor; + +import io.netty.buffer.ByteBuf; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonBinaryStream extends RedissonBucket implements RBinaryStream { + + class RedissonOutputStream extends OutputStream { + + @Override + public void write(int b) throws IOException { + get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), new byte[] {(byte)b})); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + byte[] dest = new byte[len]; + System.arraycopy(b, off, dest, 0, len); + get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), dest)); + } + + } + + class RedissonInputStream extends InputStream { + + private int index; + + @Override + public int read() throws IOException { + byte[] result = (byte[])get(commandExecutor.readAsync(getName(), codec, RedisCommands.GETRANGE, getName(), index, index)); + if (result.length == 0) { + return -1; + } + index++; + return result[0]; + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + + return (Integer)get(commandExecutor.readAsync(getName(), codec, new RedisCommand("GETRANGE", new Decoder() { + @Override + public Integer decode(ByteBuf buf, State state) { + if (buf.readableBytes() == 0) { + return -1; + } + int readBytes = Math.min(buf.readableBytes(), len); + buf.readBytes(b, off, readBytes); + index += readBytes; + return readBytes; + } + }), getName(), index, index + b.length - 1)); + } + + } + + protected RedissonBinaryStream(CommandAsyncExecutor connectionManager, String name) { + super(ByteArrayCodec.INSTANCE, connectionManager, name); + } + + @Override + public InputStream getInputStream() { + return new RedissonInputStream(); + } + + @Override + public OutputStream getOutputStream() { + return new RedissonOutputStream(); + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RBinaryStream.java b/redisson/src/main/java/org/redisson/api/RBinaryStream.java new file mode 100644 index 000000000..2bf12a488 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RBinaryStream.java @@ -0,0 +1,33 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Binary stream holder + * + * @author Nikita Koksharov + * + */ +public interface RBinaryStream extends RBucket { + + InputStream getInputStream(); + + OutputStream getOutputStream(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 5080ba008..fc786d42a 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -31,6 +31,14 @@ import org.redisson.liveobject.provider.ResolverProvider; */ public interface RedissonClient { + /** + * Returns binary stream holder instance by name + * + * @param name of binary stream + * @return BinaryStream object + */ + RBinaryStream getBinaryStream(String name); + /** * Returns geospatial items holder instance by name. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 4dbcf6c18..80992c94e 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -253,6 +253,9 @@ public interface RedisCommands { RedisStrictCommand GET_LONG = new RedisStrictCommand("GET", new LongReplayConvertor()); RedisStrictCommand GET_INTEGER = new RedisStrictCommand("GET", new IntegerReplayConvertor()); RedisCommand GETSET = new RedisCommand("GETSET", 2); + RedisCommand GETRANGE = new RedisCommand("GETRANGE"); + RedisCommand APPEND = new RedisCommand("APPEND"); + RedisCommand SETRANGE = new RedisCommand("SETRANGE"); RedisCommand SET = new RedisCommand("SET", new VoidReplayConvertor(), 2); RedisCommand SETPXNX = new RedisCommand("SET", new BooleanNotNullReplayConvertor(), 2); RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor(), 2); diff --git a/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java b/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java new file mode 100644 index 000000000..6e3141655 --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonBinaryStreamTest.java @@ -0,0 +1,92 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.junit.Test; +import org.redisson.api.RBinaryStream; + +public class RedissonBinaryStreamTest extends BaseTest { + + @Test + public void testRead() throws IOException { + RBinaryStream stream = redisson.getBinaryStream("test"); + byte[] value = {1, 2, 3, 4, 5, 6}; + stream.set(value); + + InputStream s = stream.getInputStream(); + int b = 0; + byte[] readValue = new byte[6]; + int i = 0; + while (true) { + b = s.read(); + if (b == -1) { + break; + } + readValue[i] = (byte) b; + i++; + } + + assertThat(value).isEqualTo(readValue); + } + + @Test + public void testReadArray() throws IOException { + RBinaryStream stream = redisson.getBinaryStream("test"); + byte[] value = {1, 2, 3, 4, 5, 6}; + stream.set(value); + + InputStream s = stream.getInputStream(); + byte[] b = new byte[6]; + assertThat(s.read(b)).isEqualTo(6); + assertThat(s.read(b)).isEqualTo(-1); + + assertThat(b).isEqualTo(value); + } + + @Test + public void testReadArrayWithOffset() throws IOException { + RBinaryStream stream = redisson.getBinaryStream("test"); + byte[] value = {1, 2, 3, 4, 5, 6}; + stream.set(value); + + InputStream s = stream.getInputStream(); + byte[] b = new byte[4]; + assertThat(s.read(b, 1, 3)).isEqualTo(3); + + byte[] valuesRead = {0, 1, 2, 3}; + assertThat(b).isEqualTo(valuesRead); + } + + @Test + public void testWriteArray() throws IOException { + RBinaryStream stream = redisson.getBinaryStream("test"); + OutputStream os = stream.getOutputStream(); + byte[] value = {1, 2, 3, 4, 5, 6}; + os.write(value); + + byte[] s = stream.get(); + assertThat(s).isEqualTo(value); + } + + @Test + public void testWriteArrayWithOffset() throws IOException { + RBinaryStream stream = redisson.getBinaryStream("test"); + OutputStream os = stream.getOutputStream(); + + byte[] value = {1, 2, 3, 4, 5, 6}; + os.write(value, 0, 3); + byte[] s = stream.get(); + + assertThat(s).isEqualTo(new byte[] {1, 2, 3}); + + os.write(value, 3, 3); + s = stream.get(); + + assertThat(s).isEqualTo(value); + } + + +}