RBinaryStream object added. #669

pull/697/head
Nikita 8 years ago
parent 1cd1ad3186
commit fe8739194f

@ -26,6 +26,7 @@ import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDouble;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBatch;
import org.redisson.api.RBinaryStream;
import org.redisson.api.RBitSet;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
@ -182,6 +183,11 @@ public class Redisson implements RedissonClient {
return react;
}
@Override
public RBinaryStream getBinaryStream(String name) {
return new RedissonBinaryStream(commandExecutor, name);
}
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<V>(commandExecutor, name);

@ -0,0 +1,105 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.redisson.api.RBinaryStream;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBinaryStream {
class RedissonOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), new byte[] {(byte)b}));
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] dest = new byte[len];
System.arraycopy(b, off, dest, 0, len);
get(commandExecutor.writeAsync(getName(), codec, RedisCommands.APPEND, getName(), dest));
}
}
class RedissonInputStream extends InputStream {
private int index;
@Override
public int read() throws IOException {
byte[] result = (byte[])get(commandExecutor.readAsync(getName(), codec, RedisCommands.GETRANGE, getName(), index, index));
if (result.length == 0) {
return -1;
}
index++;
return result[0];
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
return (Integer)get(commandExecutor.readAsync(getName(), codec, new RedisCommand<Integer>("GETRANGE", new Decoder<Integer>() {
@Override
public Integer decode(ByteBuf buf, State state) {
if (buf.readableBytes() == 0) {
return -1;
}
int readBytes = Math.min(buf.readableBytes(), len);
buf.readBytes(b, off, readBytes);
index += readBytes;
return readBytes;
}
}), getName(), index, index + b.length - 1));
}
}
protected RedissonBinaryStream(CommandAsyncExecutor connectionManager, String name) {
super(ByteArrayCodec.INSTANCE, connectionManager, name);
}
@Override
public InputStream getInputStream() {
return new RedissonInputStream();
}
@Override
public OutputStream getOutputStream() {
return new RedissonOutputStream();
}
}

@ -0,0 +1,33 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Binary stream holder
*
* @author Nikita Koksharov
*
*/
public interface RBinaryStream extends RBucket<byte[]> {
InputStream getInputStream();
OutputStream getOutputStream();
}

@ -31,6 +31,14 @@ import org.redisson.liveobject.provider.ResolverProvider;
*/
public interface RedissonClient {
/**
* Returns binary stream holder instance by <code>name</code>
*
* @param name of binary stream
* @return BinaryStream object
*/
RBinaryStream getBinaryStream(String name);
/**
* Returns geospatial items holder instance by <code>name</code>.
*

@ -253,6 +253,9 @@ public interface RedisCommands {
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
RedisCommand<Object> GETSET = new RedisCommand<Object>("GETSET", 2);
RedisCommand<Object> GETRANGE = new RedisCommand<Object>("GETRANGE");
RedisCommand<Object> APPEND = new RedisCommand<Object>("APPEND");
RedisCommand<Object> SETRANGE = new RedisCommand<Object>("SETRANGE");
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor(), 2);
RedisCommand<Boolean> SETPXNX = new RedisCommand<Boolean>("SET", new BooleanNotNullReplayConvertor(), 2);
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor(), 2);

@ -0,0 +1,92 @@
package org.redisson;
import static org.assertj.core.api.Assertions.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.junit.Test;
import org.redisson.api.RBinaryStream;
public class RedissonBinaryStreamTest extends BaseTest {
@Test
public void testRead() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
byte[] value = {1, 2, 3, 4, 5, 6};
stream.set(value);
InputStream s = stream.getInputStream();
int b = 0;
byte[] readValue = new byte[6];
int i = 0;
while (true) {
b = s.read();
if (b == -1) {
break;
}
readValue[i] = (byte) b;
i++;
}
assertThat(value).isEqualTo(readValue);
}
@Test
public void testReadArray() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
byte[] value = {1, 2, 3, 4, 5, 6};
stream.set(value);
InputStream s = stream.getInputStream();
byte[] b = new byte[6];
assertThat(s.read(b)).isEqualTo(6);
assertThat(s.read(b)).isEqualTo(-1);
assertThat(b).isEqualTo(value);
}
@Test
public void testReadArrayWithOffset() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
byte[] value = {1, 2, 3, 4, 5, 6};
stream.set(value);
InputStream s = stream.getInputStream();
byte[] b = new byte[4];
assertThat(s.read(b, 1, 3)).isEqualTo(3);
byte[] valuesRead = {0, 1, 2, 3};
assertThat(b).isEqualTo(valuesRead);
}
@Test
public void testWriteArray() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
OutputStream os = stream.getOutputStream();
byte[] value = {1, 2, 3, 4, 5, 6};
os.write(value);
byte[] s = stream.get();
assertThat(s).isEqualTo(value);
}
@Test
public void testWriteArrayWithOffset() throws IOException {
RBinaryStream stream = redisson.getBinaryStream("test");
OutputStream os = stream.getOutputStream();
byte[] value = {1, 2, 3, 4, 5, 6};
os.write(value, 0, 3);
byte[] s = stream.get();
assertThat(s).isEqualTo(new byte[] {1, 2, 3});
os.write(value, 3, 3);
s = stream.get();
assertThat(s).isEqualTo(value);
}
}
Loading…
Cancel
Save