diff --git a/README.md b/README.md
index 2b76e6b04..a695f2533 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis Java client
+Redisson: Redis Java client and In-Memory Data Grid
====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.10.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**
diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
index 6f5b304c9..024920701 100644
--- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
+++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
@@ -917,7 +917,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
popTimeout = Long.valueOf(param.toString()) / 1000;
break;
}
- if (param instanceof String) {
+ if ("BLOCK".equals(param)) {
found = true;
}
}
@@ -1080,24 +1080,19 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return;
}
- if (future.cause() instanceof RedisLoadingException) {
- async(details.isReadOnlyMode(), source, details.getCodec(),
- details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
- AsyncDetails.release(details);
- return;
- }
-
- if (future.cause() instanceof RedisTryAgainException) {
- connectionManager.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- async(details.isReadOnlyMode(), source, details.getCodec(),
- details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
-
- }
- }, 1, TimeUnit.SECONDS);
- AsyncDetails.release(details);
- return;
+ if (future.cause() instanceof RedisLoadingException
+ || future.cause() instanceof RedisTryAgainException) {
+ if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
+ connectionManager.newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ async(details.isReadOnlyMode(), source, details.getCodec(),
+ details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt() + 1, ignoreRedirect);
+ }
+ }, Math.min(connectionManager.getConfig().getTimeout(), 1000), TimeUnit.MILLISECONDS);
+ AsyncDetails.release(details);
+ return;
+ }
}
free(details.getParams());
diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java
index 4a72422f2..6d020a912 100644
--- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java
+++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java
@@ -776,20 +776,18 @@ public class CommandBatchService extends CommandAsyncService {
execute(entry, nodeSource, mainPromise, slots, attempt, options);
return;
}
- if (future.cause() instanceof RedisLoadingException) {
- entry.clearErrors();
- execute(entry, source, mainPromise, slots, attempt, options);
- return;
- }
- if (future.cause() instanceof RedisTryAgainException) {
- entry.clearErrors();
- connectionManager.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- execute(entry, source, mainPromise, slots, attempt, options);
- }
- }, 1, TimeUnit.SECONDS);
- return;
+ if (future.cause() instanceof RedisLoadingException
+ || future.cause() instanceof RedisTryAgainException) {
+ if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
+ entry.clearErrors();
+ connectionManager.newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ execute(entry, source, mainPromise, slots, attempt + 1, options);
+ }
+ }, Math.min(connectionManager.getConfig().getTimeout(), 1000), TimeUnit.MILLISECONDS);
+ return;
+ }
}
free(entry);
diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
index 6f1460c3a..a15dfa8f3 100644
--- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
+++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
@@ -126,7 +126,6 @@ public class LoadBalancerManager {
return unfreeze(entry, freezeReason);
}
-
public boolean unfreeze(ClientConnectionsEntry entry, FreezeReason freezeReason) {
synchronized (entry) {
if (!entry.isFreezed()) {
@@ -138,6 +137,9 @@ public class LoadBalancerManager {
entry.resetFirstFail();
entry.setFreezed(false);
entry.setFreezeReason(null);
+
+ slaveConnectionPool.initConnections(entry);
+ pubSubConnectionPool.initConnections(entry);
return true;
}
}
diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java
index 7a96b9312..1e8236e64 100644
--- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java
+++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java
@@ -83,6 +83,12 @@ abstract class ConnectionPool {
return promise;
}
+ public RPromise initConnections(ClientConnectionsEntry entry) {
+ RPromise promise = new RedissonPromise();
+ initConnections(entry, promise, false);
+ return promise;
+ }
+
private void initConnections(final ClientConnectionsEntry entry, final RPromise initPromise, boolean checkFreezed) {
final int minimumIdleSize = getMinimumIdleSize(entry);
@@ -408,16 +414,9 @@ abstract class ConnectionPool {
}
if (future.isSuccess() && "PONG".equals(future.getNow())) {
- entry.resetFirstFail();
- RPromise promise = new RedissonPromise();
- promise.addListener(new FutureListener() {
- @Override
- public void operationComplete(Future future) throws Exception {
- masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT);
- log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
- }
- });
- initConnections(entry, promise, false);
+ if (masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT)) {
+ log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
+ }
} else {
scheduleCheck(entry);
}
diff --git a/redisson/src/main/java/org/redisson/misc/LogHelper.java b/redisson/src/main/java/org/redisson/misc/LogHelper.java
index 7eca6b5d7..2855eb1c9 100644
--- a/redisson/src/main/java/org/redisson/misc/LogHelper.java
+++ b/redisson/src/main/java/org/redisson/misc/LogHelper.java
@@ -31,7 +31,7 @@ public class LogHelper {
private static final int MAX_COLLECTION_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxCollectionLogSize", "10"));
private static final int MAX_STRING_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxStringLogSize", "100"));
- private static final int MAX_BYTEBUF_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxByteBufLogSize", "10000"));
+ private static final int MAX_BYTEBUF_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxByteBufLogSize", "1000"));
private LogHelper() {
}
@@ -53,13 +53,14 @@ public class LogHelper {
return cd.getCommand() + ", params: " + LogHelper.toString(cd.getParams());
} else if (object instanceof ByteBuf) {
final ByteBuf byteBuf = (ByteBuf) object;
- if (byteBuf.refCnt() > 0) {
- if (byteBuf.writerIndex() > MAX_BYTEBUF_LOG_SIZE) {
- return new StringBuilder(byteBuf.toString(0, MAX_BYTEBUF_LOG_SIZE, CharsetUtil.UTF_8)).append("...").toString();
- } else {
- return byteBuf.toString(0, byteBuf.writerIndex(), CharsetUtil.UTF_8);
- }
- }
+ // can't be used due to Buffer Leak error is appeared in log
+// if (byteBuf.refCnt() > 0) {
+// if (byteBuf.writerIndex() > MAX_BYTEBUF_LOG_SIZE) {
+// return new StringBuilder(byteBuf.toString(0, MAX_BYTEBUF_LOG_SIZE, CharsetUtil.UTF_8)).append("...").toString();
+// } else {
+// return byteBuf.toString(0, byteBuf.writerIndex(), CharsetUtil.UTF_8);
+// }
+// }
return byteBuf.toString();
} else {
return String.valueOf(object);
diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java
index b66ae09de..b740b6462 100644
--- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java
+++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java
@@ -223,6 +223,42 @@ public class RedissonStreamTest extends BaseTest {
assertThat(s2.get("test1").keySet()).containsExactly(id12, id13);
assertThat(s2.get("test2").keySet()).containsExactly(id22, id23);
}
+
+ @Test
+ public void testReadGroupBlocking() {
+ RStream stream = redisson.getStream("test");
+
+ StreamMessageId id0 = stream.add("0", "0");
+
+ stream.createGroup("testGroup", id0);
+
+ stream.add("1", "1");
+ stream.add("2", "2");
+ stream.add("3", "3");
+
+ Map> s = stream.readGroup("testGroup", "consumer1", 3, 5, TimeUnit.SECONDS);
+ assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
+ assertThat(s.size()).isEqualTo(3);
+
+ stream.removeGroup("testGroup");
+
+ stream.createGroup("testGroup", id0);
+
+ stream.add("1", "1");
+ stream.add("2", "2");
+ stream.add("3", "3");
+
+ RStream stream2 = redisson.getStream("test2");
+
+ StreamMessageId id1 = stream2.add("0", "0");
+
+ stream2.createGroup("testGroup", id1);
+
+// Map>> s2 = stream.readGroup("testGroup", "consumer1", 3, 5, TimeUnit.SECONDS, id0, Collections.singletonMap("test2", id1));
+// assertThat(s2.values().iterator().next().values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
+// assertThat(s2.size()).isEqualTo(3);
+ }
+
@Test
public void testReadGroup() {
@@ -355,10 +391,22 @@ public class RedissonStreamTest extends BaseTest {
t.start();
long start = System.currentTimeMillis();
- Map> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0));
+ Map> s = stream.read(2, 4, TimeUnit.SECONDS, new StreamMessageId(0));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get(new StreamMessageId(1))).isEqualTo(entries1);
+
+ StreamMessageId id0 = stream.add("11", "11");
+ stream.add("22", "22");
+
+ RStream stream2 = redisson.getStream("test2");
+
+ StreamMessageId id1 = stream2.add("33", "33");
+ stream2.add("44", "44");
+
+ Map>> s2 = stream.read(5, TimeUnit.SECONDS, id0, Collections.singletonMap("test2", id1));
+ assertThat(s2.values().iterator().next().values().iterator().next().keySet()).containsAnyOf("11", "22", "33", "44");
+ assertThat(s2.keySet()).containsExactlyInAnyOrder("test", "test2");
}
@Test