refactoring

pull/5884/head
Nikita Koksharov 10 months ago
parent 86c889204a
commit 6bfd7c978d

@ -7,7 +7,7 @@ import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
@ -51,7 +51,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
RedisConnection c = factory.getConnection();
c.publish("test".getBytes(), "sdfdsf".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 2;
});
Assertions.assertThat(counterTest2.get()).isZero();
@ -72,7 +72,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -92,7 +92,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -5,7 +5,7 @@ import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -30,7 +30,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
}
@ -49,7 +49,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -71,7 +71,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -4,7 +4,7 @@ import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
@ -23,7 +23,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -43,7 +43,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -214,15 +214,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -4,7 +4,7 @@ import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
@ -23,7 +23,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -43,7 +43,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -36,7 +36,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -55,7 +55,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -77,7 +77,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
@ -65,7 +65,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND).until(() -> {
Awaitility.await().atMost(Durations.ONE_SECOND).until(() -> {
return counterTest.get() == 3;
});
@ -85,7 +85,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -105,7 +105,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -36,7 +36,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -55,7 +55,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -77,7 +77,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
@ -65,7 +65,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
@ -85,7 +85,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -105,7 +105,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -35,7 +35,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> {
Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@ -59,7 +59,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -78,7 +78,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -100,7 +100,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
@ -65,7 +65,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
@ -85,7 +85,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -105,7 +105,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -290,9 +290,9 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
Mono<Map<byte[], byte[]>> m;
if (command.getCount() > 0) {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf, command.getCount());
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf, command.getCount());
} else {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf);
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf);
}
Mono<Map<ByteBuffer, ByteBuffer>> f = m.map(v -> v.entrySet().stream().collect(Collectors.toMap(e -> ByteBuffer.wrap(e.getKey()), e -> ByteBuffer.wrap(e.getValue()))));

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -35,7 +35,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> {
Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@ -59,7 +59,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -78,7 +78,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -100,7 +100,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -2,7 +2,7 @@ package org.redisson.spring.data.connection;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.RedisRunner;
import org.redisson.Redisson;
@ -106,7 +106,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
@ -154,7 +154,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
@ -174,7 +174,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -194,7 +194,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -290,9 +290,9 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
Mono<Map<byte[], byte[]>> m;
if (command.getCount() > 0) {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf, command.getCount());
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf, command.getCount());
} else {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf);
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf);
}
Mono<Map<ByteBuffer, ByteBuffer>> f = m.map(v -> v.entrySet().stream().collect(Collectors.toMap(e -> ByteBuffer.wrap(e.getKey()), e -> ByteBuffer.wrap(e.getValue()))));

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -35,7 +35,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> {
Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@ -59,7 +59,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -78,7 +78,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -100,7 +100,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -1,7 +1,7 @@
package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
@ -163,7 +163,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> {
assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1");
});
@ -191,7 +191,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
@ -239,7 +239,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
@ -259,7 +259,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -279,7 +279,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -290,9 +290,9 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
Mono<Map<byte[], byte[]>> m;
if (command.getCount() > 0) {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf, command.getCount());
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf, command.getCount());
} else {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf);
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf);
}
Mono<Map<ByteBuffer, ByteBuffer>> f = m.map(v -> v.entrySet().stream().collect(Collectors.toMap(e -> ByteBuffer.wrap(e.getKey()), e -> ByteBuffer.wrap(e.getValue()))));

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -35,7 +35,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> {
Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@ -59,7 +59,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -78,7 +78,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -100,7 +100,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -1,7 +1,7 @@
package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
@ -105,7 +105,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> {
assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1");
});
@ -133,7 +133,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
@ -181,7 +181,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
@ -201,7 +201,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -221,7 +221,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -290,9 +290,9 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
Mono<Map<byte[], byte[]>> m;
if (command.getCount() > 0) {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf, command.getCount());
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf, command.getCount());
} else {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf);
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf);
}
Mono<Map<ByteBuffer, ByteBuffer>> f = m.map(v -> v.entrySet().stream().collect(Collectors.toMap(e -> ByteBuffer.wrap(e.getKey()), e -> ByteBuffer.wrap(e.getValue()))));

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveSubscription;
@ -35,7 +35,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
}
Awaitility.await().atMost(Duration.ONE_SECOND).untilAsserted(() -> {
Awaitility.await().atMost(Durations.ONE_SECOND).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(40);
});
}
@ -59,7 +59,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
ReactiveRedisConnection connection = factory.getReactiveConnection();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> counter.get() == 1);
}
@ -78,7 +78,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();
@ -100,7 +100,7 @@ public class RedissonSubscribeReactiveTest extends BaseConnectionTest {
}).subscribe();
connection.pubSubCommands().publish(ByteBuffer.wrap("test".getBytes()), ByteBuffer.wrap("msg".getBytes())).block();
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
ss.unsubscribe();

@ -1,7 +1,7 @@
package org.redisson.spring.data.connection;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.Durations;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
@ -105,7 +105,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
factory.getConnection().setEx("test:key2".getBytes(), 3, "123".getBytes());
factory.getConnection().setEx("test:key1".getBytes(), 3, "123".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).untilAsserted(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> {
assertThat(names).containsExactlyInAnyOrder("EG:test:key1", "test:key2", "test:key1");
});
@ -133,7 +133,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
RedisConnection c = factory.getConnection();
c.publish("a".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.untilAsserted(() -> {
assertThat(msg).containsExactly("msg".getBytes());
});
@ -181,7 +181,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
c.set("mykey".getBytes(), "2".getBytes());
c.del("mykey".getBytes());
Awaitility.await().atMost(Duration.FIVE_SECONDS).until(() -> {
Awaitility.await().atMost(Durations.FIVE_SECONDS).until(() -> {
return counterTest.get() == 3;
});
@ -201,7 +201,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();
@ -221,7 +221,7 @@ public class RedissonSubscribeTest extends BaseConnectionTest {
}, "test".getBytes());
connection.publish("test".getBytes(), "msg".getBytes());
Awaitility.await().atMost(Duration.ONE_SECOND)
Awaitility.await().atMost(Durations.ONE_SECOND)
.until(() -> Arrays.equals("msg".getBytes(), msg.get()));
connection.getSubscription().unsubscribe();

@ -290,9 +290,9 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
Mono<Map<byte[], byte[]>> m;
if (command.getCount() > 0) {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf, command.getCount());
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf, command.getCount());
} else {
m = read(keyBuf, ByteArrayCodec.INSTANCE, executorService.getServiceManager().getHRandomFieldCommand(), keyBuf);
m = read(keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HRANDFIELD, keyBuf);
}
Mono<Map<ByteBuffer, ByteBuffer>> f = m.map(v -> v.entrySet().stream().collect(Collectors.toMap(e -> ByteBuffer.wrap(e.getKey()), e -> ByteBuffer.wrap(e.getValue()))));

@ -412,15 +412,15 @@ public class RedissonReactiveStreamCommands extends RedissonBaseReactive impleme
if (command.getConsumer() == null) {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, params.toArray());
}
} else {
if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0) {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
} else {
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, executorService.getServiceManager().getXReadGroupCommand(), params.toArray());
m = read(toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, params.toArray());
}
}

@ -617,7 +617,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Map<K, V>> randomEntriesAsync(int count) {
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getHRandomFieldCommand(), getRawName(), count, "WITHVALUES");
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.HRANDFIELD, getRawName(), count, "WITHVALUES");
}
@Override

@ -243,9 +243,9 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
if (rp.getTimeout() != null) {
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupBlockingCommand(), params.toArray());
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.XREADGROUP_BLOCKING, params.toArray());
}
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupCommand(), params.toArray());
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.XREADGROUP, params.toArray());
}
@Override
@ -281,9 +281,9 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
if (rp.getTimeout() != null) {
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupBlockingSingleCommand(), params.toArray());
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.XREADGROUP_BLOCKING_SINGLE, params.toArray());
}
return commandExecutor.writeAsync(getRawName(), codec, getServiceManager().getXReadGroupSingleCommand(), params.toArray());
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.XREADGROUP_SINGLE, params.toArray());
}
@Override
@ -395,9 +395,9 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
if (rp.getTimeout() != null) {
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadBlockingCommand(), params.toArray());
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.XREAD_BLOCKING, params.toArray());
}
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadCommand(), params.toArray());
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.XREAD, params.toArray());
}
@Override
@ -425,9 +425,9 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
params.add(rp.getId1());
if (rp.getTimeout() != null) {
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadBlockingSingleCommand(), params.toArray());
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.XREAD_BLOCKING_SINGLE, params.toArray());
}
return commandExecutor.readAsync(getRawName(), codec, getServiceManager().getXReadSingleCommand(), params.toArray());
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.XREAD_SINGLE, params.toArray());
}
@Override

@ -602,10 +602,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
if (readOnlyMode && command.getName().equals("SORT") && !SORT_RO_SUPPORTED.get()) {
RedisCommand<V> cmnd = getServiceManager().resp3(command);
if (readOnlyMode && cmnd.getName().equals("SORT") && !SORT_RO_SUPPORTED.get()) {
readOnlyMode = false;
} else if (readOnlyMode && command.getName().equals("SORT") && SORT_RO_SUPPORTED.get()) {
RedisCommand cmd = new RedisCommand("SORT_RO", command.getReplayMultiDecoder());
} else if (readOnlyMode && cmnd.getName().equals("SORT") && SORT_RO_SUPPORTED.get()) {
RedisCommand cmd = new RedisCommand("SORT_RO", cmnd.getReplayMultiDecoder());
CompletableFuture<R> mainPromise = createPromise();
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, cmd, params, mainPromise,
ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry,
@ -625,7 +627,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
CompletableFuture<R> mainPromise = createPromise();
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise,
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, cmnd, params, mainPromise,
ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry,
retryAttempts, retryInterval, responseTimeout, trackChanges);
executor.execute();

@ -48,7 +48,6 @@ import org.redisson.RedissonShutdownException;
import org.redisson.Version;
import org.redisson.api.NatMapper;
import org.redisson.api.RFuture;
import org.redisson.api.StreamMessageId;
import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.codec.Codec;
@ -568,67 +567,25 @@ public final class ServiceManager {
return cfg.getProtocol() == Protocol.RESP3;
}
public RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> getXReadGroupBlockingCommand() {
if (isResp3()) {
return RedisCommands.XREADGROUP_BLOCKING_V2;
}
return RedisCommands.XREADGROUP_BLOCKING;
}
public RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> getXReadGroupCommand() {
if (isResp3()) {
return RedisCommands.XREADGROUP_V2;
}
return RedisCommands.XREADGROUP;
}
public RedisCommand<Map<StreamMessageId, Map<Object, Object>>> getXReadGroupBlockingSingleCommand() {
if (isResp3()) {
return RedisCommands.XREADGROUP_BLOCKING_SINGLE_V2;
}
return RedisCommands.XREADGROUP_BLOCKING_SINGLE;
}
public RedisCommand<Map<StreamMessageId, Map<Object, Object>>> getXReadGroupSingleCommand() {
if (isResp3()) {
return RedisCommands.XREADGROUP_SINGLE_V2;
}
return RedisCommands.XREADGROUP_SINGLE;
}
private static final Map<RedisCommand<?>, RedisCommand<?>> RESP3MAPPING = new HashMap<>();
public RedisCommand<Map<StreamMessageId, Map<Object, Object>>> getXReadBlockingSingleCommand() {
if (isResp3()) {
return RedisCommands.XREAD_BLOCKING_SINGLE_V2;
}
return RedisCommands.XREAD_BLOCKING_SINGLE;
}
public RedisCommand<Map<StreamMessageId, Map<Object, Object>>> getXReadSingleCommand() {
if (isResp3()) {
return RedisCommands.XREAD_SINGLE_V2;
}
return RedisCommands.XREAD_SINGLE;
}
public RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> getXReadBlockingCommand() {
if (isResp3()) {
return RedisCommands.XREAD_BLOCKING_V2;
}
return RedisCommands.XREAD_BLOCKING;
}
public RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> getXReadCommand() {
if (isResp3()) {
return RedisCommands.XREAD_V2;
}
return RedisCommands.XREAD;
static {
RESP3MAPPING.put(RedisCommands.XREADGROUP_BLOCKING, RedisCommands.XREADGROUP_BLOCKING_V2);
RESP3MAPPING.put(RedisCommands.XREADGROUP, RedisCommands.XREADGROUP_V2);
RESP3MAPPING.put(RedisCommands.XREADGROUP_BLOCKING_SINGLE, RedisCommands.XREADGROUP_BLOCKING_SINGLE_V2);
RESP3MAPPING.put(RedisCommands.XREADGROUP_SINGLE, RedisCommands.XREADGROUP_SINGLE_V2);
RESP3MAPPING.put(RedisCommands.XREAD_BLOCKING_SINGLE, RedisCommands.XREAD_BLOCKING_SINGLE_V2);
RESP3MAPPING.put(RedisCommands.XREAD_SINGLE, RedisCommands.XREAD_SINGLE_V2);
RESP3MAPPING.put(RedisCommands.XREAD_BLOCKING, RedisCommands.XREAD_BLOCKING_V2);
RESP3MAPPING.put(RedisCommands.XREAD, RedisCommands.XREAD_V2);
RESP3MAPPING.put(RedisCommands.HRANDFIELD, RedisCommands.HRANDFIELD_V2);
}
public RedisCommand<Map<Object, Object>> getHRandomFieldCommand() {
public <R> RedisCommand<R> resp3(RedisCommand<R> command) {
if (isResp3()) {
return RedisCommands.HRANDFIELD_V2;
return (RedisCommand<R>) RESP3MAPPING.getOrDefault(command, command);
}
return RedisCommands.HRANDFIELD;
return command;
}
public Map<String, ResponseEntry> getResponses() {

Loading…
Cancel
Save